[jira] [Commented] (FLINK-5023) Add get() method in State interface

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15865167#comment-15865167
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang closed the pull request at:

https://github.com/apache/flink/pull/2768


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2017-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15865166#comment-15865166
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
Close the pull request because the state descriptor now is refactored with 
the introduction of composited serializers (See 
[FLINK-5790](https://issues.apache.org/jira/browse/FLINK-5790)).


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2017-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853735#comment-15853735
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@aljoscha That way, it's very confusing that a `ReadableState` is not a 
`State`.  Hence I made `State` read-only and introduced the `UpdatableState` 
interface who extends `State` with the method `clear()`. 

These changes (mainly the introduction of the `get()` method) are intended 
to remove the duplicated code. As they have little relationship with the 
implementation of map states. I think it's okay not to change these interfaces 
now.

But I prefer to rethink the state hierarchy in the near future because 
there exists too much duplicated code now. 


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2017-02-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15852832#comment-15852832
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2768
  
In fact, my original suggestion was to add a new interface `ReadableState`:
```
interface ReadableState {
  T get();
}
```

and leave all the existing interfaces as they are. That way we would have 
the least amount of changes in existing code and still have a common interface 
for state that can be read. (Note that `ReadableState` does not extends `State` 
on purpose, because of `State.clear()`.)


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2017-02-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15852487#comment-15852487
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@StephanEwen  Thanks a lot for your comments. 

**Removing `clear()` from `State`**

This change is suggested by @aljoscha who wants to let broadcast states 
share the same interface (see the discussion in 
[FLINK-5023](https://issues.apache.org/jira/browse/FLINK-5023)) . As mentioned, 
the broadcast states are read-only in some cases. Hence it's suggested not to 
provide the `clear()` method in the base `State` interface.

**Changing the `State` interface**

The `State` interface is typed because I want to provide the `get()` method 
for all states so that we can retrieve the data in the state (under the current 
key for keyed states). The functionality is already provided by all states 
except `ValueState` who provides the same functionality with the `value()` 
method. Providing the method for all states can help reduce some duplicated 
code in the implementation. It also makes sense for read-only states mentioned 
above.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2017-01-26 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15840004#comment-15840004
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2768
  
Update: Just saw that you already did the migration support.

Will merge the `StateDescriptor` refactoring. Let's discuss the other two 
types of changes separately.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2017-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837881#comment-15837881
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2768
  
Hi @shixiaogang !

I went through this pull request and below are a few thoughts. The pull 
request changes many things together. Some can work, and for others I would 
suggest to do it differently.
Let me know what you think about this:

## Changing the `StateDescriptor`

This is probably a good cleanup. It does currently break the backwards 
compatibility, because Flink 1.1 wrote the state descriptors into the 
checkpoints. Flink 1.2 and 1.3 do not do that any more, but currently rely on 
the unchanged StateDescriptor classes for resuming Flink-1.1-savepoints.

We added a way to define "migration" classes, meaning we can store the old 
StateDescriptor classes in a migration package where they are dynamically 
loaded only as proxies when resuming a Flink-1.1 savepoint. That way we can 
change the classes and maintain backwards compatibility.

## Removing `clear()` from`State`

I think it would be nice to keep `clear()` on the base `State` interface. 
Can you explain why you want to remove it? In my opinion, every state needs to 
be able to clear, so it makes sense to have this on the case interface.

If this is in preparation for the `MapState`, then the `MapState` can 
simply override the `clear()` method with different logic. 
  - I think that the MapState needs to support clear as well, where it 
deletes the sub-map for the current key.
  - On the HeapStateBackend, this is quite easy, when we assume that each 
(key/namespace) has a map as the value (and the complete map can be dropped)
  - For the RocksDBStateBackend, it is a bit more expensive, and would 
correspond to a range-iteration-and-deletes.

If an application decides that the MapState clear() is to expensive, it can 
decide to not call it. But we should still support it for cases where it is 
necessary.

## Changing the `State` interface

I would like to not change `State` to `State` in the Flink master now, 
because it cases warnings in all parts of the code (that suddenly use raw 
types) and for some user programs. 

While this would be done for Flink 2.0, it would make merging simpler if we 
don't change it now.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2017-01-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827234#comment-15827234
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
Despite the changes in the state descriptors, the Flink jobs can restore 
from old versioned snapshots now.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711289#comment-15711289
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
I rebased the branch to resolve the conflicts with the master branch.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15701547#comment-15701547
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2768
  
Thanks for updating, @shixiaogang. I'll look at this today!

For the change to reducing state it would be better to have this as a 
separate issue/commit/PR. @fhueske you already opened an issue for this, right? 
Could you please point us to that?


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15701513#comment-15701513
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
I moved default value from `SimpleStateDescriptor` to 
`ValueStateDescriptor`. Now only `ValueStateDescriptor`s have default values.  
The serialization methods may contain some duplicated code, but i think it's 
acceptable.

I also modify the implementation of `HeapReducingState`s. The first value 
will be copied before being put into the heap.

@aljoscha What do you think of these changes?






> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15692853#comment-15692853
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2768
  
@shixiaogang you can also do that, but then the machinery in 
`SimpleStateDescriptor` is only used by `ValueStateDescriptor` because this is 
really the only descriptor that has a default value. (In my original 
implementation it was like this and only `ValueStateDescriptor` had the 
serialisation logic and a default value.)


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15692636#comment-15692636
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
Oh... I added another field to make the code more clear, but I did not 
notice the serialization problem. Thanks very much for your reminder.

Your solution does work though the concept of "defaultValue" in folding 
states is a little confusing. Another solution to let `FoldingStateDescriptor` 
implement its own serialization method. And I prefer this one.

What do you think? @aljoscha 


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689795#comment-15689795
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/2768
  
@shixiaogang Thanks for the swift update! The changes look very good now 
though I found one last thing that could be problematic. This is changing the 
behaviour of `FoldingState` when it comes to the default value. I'll quickly 
open a PR that verifies the current behaviour.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688809#comment-15688809
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user shixiaogang commented on the issue:

https://github.com/apache/flink/pull/2768
  
@aljoscha Thanks for your review. I have updated the PR according to your 
suggestion. 


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687290#comment-15687290
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r89128914
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/SimpleStateDescriptor.java
 ---
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Base class for the descriptors of simple states. A {@code 
SimpleStateDescriptor} is used
+ * for creating partitioned simple states whose values are not composited.
+ *
+ * Subclasses must correctly implement {@link #equals(Object)} and 
{@link #hashCode()}.
+ *
+ * @param  The type of the value in the state.
+ * @param  The type of the created state.
+ */
+@PublicEvolving
+public abstract class SimpleStateDescriptor> extends 
StateDescriptor {
+   private static final long serialVersionUID = 1L;
+
+   /** The serializer for the type. May be eagerly initialized in the 
constructor,
+* or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   protected TypeSerializer typeSerializer;
+
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   private transient TypeInformation typeInfo;
+
+   /** The default value returned by the state when no other value is 
bound to a key */
+   protected transient T defaultValue;
--- End diff --

We might think about renaming this because for `FoldingState` it's not a 
default value but the initial accumulation value. (Just a suggestion, not 
strictly necessary)


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687291#comment-15687291
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r89162070
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
 ---
@@ -179,12 +179,13 @@ public TriggerResult onMerge(W window, OnMergeContext 
ctx) throws Exception {
 *
 * @param stateDescriptor The StateDescriptor that contains the 
name and type of the
 *state that is being accessed.
+* @param  The type of the values in the state.
 * @param  The type of the state.
 * @return The partitioned state object.
 * @throws UnsupportedOperationException Thrown, if no 
partitioned state is available for the
 *   function (function is 
not part os a KeyedStream).
 */
-S getPartitionedState(StateDescriptor 
stateDescriptor);
+   > S 
getPartitionedState(StateDescriptor stateDescriptor);
--- End diff --

Same thing as elsewhere: we don't need `V`.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687293#comment-15687293
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r89160249
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java
 ---
@@ -69,25 +69,26 @@
 * @param stateDescriptor The identifier for the state. This contains 
name and can create a default state value.
 
 * @param  The type of the namespace.
+* @param  The type of the values in the state.
 * @param  The type of the state.
 *
 * @return A new key/value state backed by this backend.
 *
 * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
 */
@SuppressWarnings({"rawtypes", "unchecked"})
-S getPartitionedState(
+   > S getPartitionedState(
--- End diff --

I think we don't need the additional `V` parameter. It's never used and 
`State` works just as while while `State` just pretends to add more type 
safety. Because both ways work I would prefer to go with the solution that 
doesn't add additional generic parameters to methods.

Same holds for `mergePartitionedStates()`.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687294#comment-15687294
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r89129638
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java ---
@@ -37,30 +37,24 @@
  * @param  Type of the value in the state.
  */
 @PublicEvolving
-public interface ValueState extends State {
-
+public interface ValueState extends UpdatableState {
/**
-* Returns the current value for the state. When the state is not
-* partitioned the returned value is the same for all inputs in a given
-* operator instance. If state partitioning is applied, the value 
returned
-* depends on the current operator input, as the operator maintains an
-* independent state for each partition.
-* 
-* @return The operator state value corresponding to the current input.
-* 
+* Returns the current value for the state. The method performs the same
+* functionality as {@link State#get()}.
+*
 * @throws IOException Thrown if the system cannot access the state.
 */
+   @Deprecated
--- End diff --

Please also add a `@deprecated` description in the Javadoc, pointing to use 
`get()`.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687289#comment-15687289
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r89161821
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 ---
@@ -487,7 +487,7 @@ protected ProcessingTimeService 
getProcessingTimeService() {
 * @throws IllegalStateException Thrown, if the key/value state was 
already initialized.
 * @throws Exception Thrown, if the state backend cannot create the 
key/value state.
 */
-   protected  S getPartitionedState(StateDescriptor 
stateDescriptor) throws Exception {
+   protected > S 
getPartitionedState(StateDescriptor stateDescriptor) throws Exception {
--- End diff --

Here, the same thing I mentioned on `KeyedStateBackend` holds, we don't 
need the additional `V` parameter.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687292#comment-15687292
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r89161552
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java
 ---
@@ -28,16 +28,17 @@
 /**
  * Internal operator handling queryable state instances (setup and update).
  *
+ * @param   Value type
  * @param   State type
  * @param  Input type
  */
 @Internal
-abstract class AbstractQueryableStateOperator
+abstract class AbstractQueryableStateOperator, IN>
--- End diff --

I think here the same thing I mentioned earlier holds, we don't really need 
the additional `V` parameter.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687288#comment-15687288
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r89129366
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/State.java ---
@@ -20,19 +20,30 @@
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.io.IOException;
+
 /**
  * Interface that different types of partitioned state must implement.
  *
  * The state is only accessible by functions applied on a 
KeyedDataStream. The key is
  * automatically supplied by the system, so the function always sees the 
value mapped to the
  * key of the current element. That way, the system can handle stream and 
state partitioning
  * consistently together.
+ *
+ * @param  The type of the values in the state.
  */
 @PublicEvolving
-public interface State {
-
+public interface State {
/**
-* Removes the value mapped under the current key.
+* Returns the current value for the state. When the state is not
+* partitioned the returned value is the same for all inputs in a given
+* operator instance. If state partitioning is applied, the value 
returned
+* depends on the current operator input, as the operator maintains an
+* independent state for each partition.
--- End diff --

This is incorrect, we don't maintain state by partition but by key. (This 
is not your fault, it was always like this on `ValueState` and you copied it 
from there, I just discovered it now when reading your code.)


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687295#comment-15687295
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r89130528
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSimpleState.java
 ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.state.SimpleStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.UpdatableState;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import 
org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+
+/**
+ * Heap-backed partitioned states whose values are not composited and is 
snapshotted into files.
--- End diff --

the `is snapshotted into files` portion is not always true, and also not 
necessary here.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-09 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653177#comment-15653177
 ] 

Xiaogang Shi commented on FLINK-5023:
-

[~aljoscha] [~StephanEwen] I have updated the PR. Now, `State` only provides a 
read-only accessor and a new interface called `UpdatableState` is added.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-08 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649615#comment-15649615
 ] 

Xiaogang Shi commented on FLINK-5023:
-

Aljoscha Krettek  In that case, i think we should provide {{UpdatableState}} 
instead of {{ReadableState}}.  

Now {{State}} is updatable because it has the method {{clear()}}. If 
{{ReadableState}} does not inherit from {{State}}. I would prefer 
{{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a 
readable state is not a state. 

>From this point, i think the base {{State}} should be readable, and a new 
>interface {{UpdatableState}} could be provided to abstract those states 
>allowing both reads and writes.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-08 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648122#comment-15648122
 ] 

Aljoscha Krettek commented on FLINK-5023:
-

[~StephanEwen] [~xiaogang.shi] In fact, I would prefer if we went one step 
further and have {{ReadableState}} like this:

{code}
interface ReadableState {
  T get();
}
{code}

{{ValueState}} would then be
{code}
public interface ValueState extends State, ReadableState, 
OperatorState { // OperatorState is here for backwards compatibility
...
}
{code}

I want a clear separation of capabilities of state accessors because I think 
that this might become relevant in the future. I even have a concrete use case 
right now: for FLINK-4940/FLINK-3659 I want to reuse the state interfaces 
because then users can use the same known state types for interacting with 
broadcast/global state that they use for accessing keyed state. In that case, 
users should be allowed read/write access when processing data from the 
broadcast side while they should only have read access when processing data 
from non-broadcast inputs (otherwise, the assumption that broadcast state is 
the same on all parallel subtask instances of an operator can be violated.).

For this, users would only be allowed to get a {{ReadableState}} for read 
access and a full {{ValueState}} (or {{ListState}} or {{ReducingState}}...) for 
read/write access.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15647572#comment-15647572
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r86987978
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java
 ---
@@ -84,21 +138,37 @@ public boolean equals(Object o) {
 
ListStateDescriptor that = (ListStateDescriptor) o;
 
-   return serializer.equals(that.serializer) && 
name.equals(that.name);
-
+   return elemTypeSerializer.equals(that.elemTypeSerializer) && 
name.equals(that.name);
}
 
@Override
public int hashCode() {
-   int result = serializer.hashCode();
+   int result = elemTypeSerializer.hashCode();
result = 31 * result + name.hashCode();
return result;
}
 
@Override
public String toString() {
return "ListStateDescriptor{" +
-   "serializer=" + serializer +
+   "elem serializer=" + elemTypeSerializer +
'}';
}
+
+   // 

+   //  Serialization
+   // 

+
+   private void writeObject(final ObjectOutputStream out) throws 
IOException {
+   // make sure we have a serializer before the type information 
gets lost
+   initializeSerializerUnlessSet(new ExecutionConfig());
+
+   // write all the non-transient fields
+   out.defaultWriteObject();
+   }
+
+   private void readObject(final ObjectInputStream in) throws IOException, 
ClassNotFoundException {
--- End diff --

I think this method is not necessary, because it only triggers default 
serialization anyways.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15647567#comment-15647567
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2768#discussion_r86987689
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSimpleState.java
 ---
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.state.SimpleStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteOptions;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+/**
+ * The implementation that stores simple states in RocksDB.
+ *
+ * @param  The type of the key.
+ * @param  The type of the namespace.
+ * @param  The type of value that the state state stores.
+ */
+public class RocksDBSimpleState extends AbstractRocksDBState implements State {
+
+   /** State descriptor from which to create this state instance */
+   protected final SimpleStateDescriptor> stateDesc;
+
+   /** Serializer for the values */
+   protected final TypeSerializer valueSerializer;
+
+   /**
+* We disable writes to the write-ahead-log here. We can't have these 
in the base class
+* because JNI segfaults for some reason if they are.
+*/
+   protected final WriteOptions writeOptions;
+
+   /**
+* Creates a new {@code RocksDBSimpleState}.
+*
+* @param namespaceSerializer The serializer for the namespace.
+* @param stateDesc The state identifier for the state. This contains 
name and can create a default state value.
+*/
+   public RocksDBSimpleState(ColumnFamilyHandle columnFamily,
+   TypeSerializer namespaceSerializer,
+   SimpleStateDescriptor> stateDesc,
+   RocksDBKeyedStateBackend backend
+   ) {
+   super(columnFamily, namespaceSerializer, backend);
+
+   this.stateDesc = Preconditions.checkNotNull(stateDesc, "State 
Descriptor");
+   this.valueSerializer = stateDesc.getSerializer();
+
+   writeOptions = new WriteOptions();
+   writeOptions.setDisableWAL(true);
+   }
+
+   @Override
+   public V get() {
+   try {
+   writeCurrentKeyWithGroupAndNamespace();
+   byte[] key = keySerializationStream.toByteArray();
+   byte[] valueBytes = backend.db.get(columnFamily, key);
+   if (valueBytes == null) {
+   return stateDesc.getDefaultValue();
+   }
+   return valueSerializer.deserialize(new 
DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
--- End diff --

Given that this is used only in single threaded settings, we should try to 
reuse the `DataInputViewStreamWrapper` and `ByteArrayInputStream`.


> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or m

[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-07 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646299#comment-15646299
 ] 

Xiaogang Shi commented on FLINK-5023:
-

I have opened a PR: https://github.com/apache/flink/pull/2768/files. 

Since the implementation of `State` and `StateDescriptor` is closely connected, 
i also put the code of FLINK-5024 in this PR. 

Existing code may be affected by the changes in `StateDescriptor` because now 
`StateDescriptor` only accept one type argument.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646286#comment-15646286
 ] 

ASF GitHub Bot commented on FLINK-5023:
---

GitHub user shixiaogang opened a pull request:

https://github.com/apache/flink/pull/2768

[FLINK-5023 & FLINK-5024] Add SimpleStateDescriptor to clarify the concepts

Changes in the definition of `State` and `StateDescriptor`:
- Add `get()` in the `State` interface.
- Remove type serializers of state values from `StateDescriptor`s.
- Add `SimpleStateDescriptor` to simplify the construction of 
`ValueStateDescriptor`, `ReducingStateDescriptor` and `FoldingStateDescriptor`.
- Changes the definition of `KeyedStateBackend` and 
`AbstractKeyedStateBackend` accordingly.
- Modify the implementation of `ListStateDescriptor` accordingly.

Changes in HeapStateBackend:
- Let `AbstractHeapState` not implement the `State` interface. The 
`clear()` method now is removed from `AbstractHeapState`.
- Add `HeapSimpleState` to simplify the implementation of `HeapValueState`, 
`HeapReducingState` and `HeapFoldingState`.
- Change the implementation of `HeapValueState`, `HeapReducingState` and 
`HeapFoldingState` accordingly.

Changes in RocksDBStateBackend:
- Let `AbstractRocksDBState` not implement the `State` interface, removing 
the `clear()` method. Now, `AbstractRocksDBState` does not depend on the types 
of `State` and `StateDescriptor` any more.
- Add `RocksDBSimpleState` to simplify the implementation of 
`RocksDBValueState`, `RocksDBReducingState` and `RocksDBFoldingState`.
- Change the implementation of `RocksDBValueState`, `RocksDBReducingState` 
and `RocksDBFoldingState` accordingly.

Others:
- Update the usage of `State`s in the implementation of window operators.
- Update the usage of `State`s in unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/alibaba/flink flink-5023

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2768.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2768


commit 007a93b454e693bc3662f540ac5f33e899ce9058
Author: xiaogang.sxg 
Date:   2016-11-08T02:38:22Z

Refactor the interface of State and StateDescriptor




> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-07 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15644451#comment-15644451
 ] 

Stephan Ewen commented on FLINK-5023:
-

I agree with [~xiaogang.shi] - do we really need another interface? Can we not 
just augment the {{State}} interface?

This would only be API breaking if we expect users to implement state 
interfaces themselves, which we do not, I think.

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-07 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643925#comment-15643925
 ] 

Xiaogang Shi commented on FLINK-5023:
-

The only old method affected is the `value()` method in ValueState. All other 
states have already implemented the `get()` method. We can implement 
`ValueState#value()` by wrapping the `get()` method to avoid any changes to 
existing code. 

The introduction of ReadableState works. But I think the additional 
interfaces will make the code "verbose" :)

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5023) Add get() method in State interface

2016-11-07 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643720#comment-15643720
 ] 

Aljoscha Krettek commented on FLINK-5023:
-

I also thought about this. We shouldn't remove the old methods, though, so as 
to not break existing code.

What I wanted to do is add a new interface {{ReadableState}} like this:

{code}
interface ReadableState extends State {
  T get();
}
{code}

What do you think?

> Add get() method in State interface
> ---
>
> Key: FLINK-5023
> URL: https://issues.apache.org/jira/browse/FLINK-5023
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>
> Currently, the only method provided by the State interface is `clear()`. I 
> think we should provide another method called `get()` to return the 
> structured value (e.g., value, list, or map) under the current key. 
> In fact, the functionality of `get()` has already been implemented in all 
> types of states: e.g., `value()` in ValueState and `get()` in ListState. The 
> modification to the interface can better abstract these states.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)