[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15865167#comment-15865167 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang closed the pull request at: https://github.com/apache/flink/pull/2768 > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15865166#comment-15865166 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 Close the pull request because the state descriptor now is refactored with the introduction of composited serializers (See [FLINK-5790](https://issues.apache.org/jira/browse/FLINK-5790)). > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15853735#comment-15853735 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @aljoscha That way, it's very confusing that a `ReadableState` is not a `State`. Hence I made `State` read-only and introduced the `UpdatableState` interface who extends `State` with the method `clear()`. These changes (mainly the introduction of the `get()` method) are intended to remove the duplicated code. As they have little relationship with the implementation of map states. I think it's okay not to change these interfaces now. But I prefer to rethink the state hierarchy in the near future because there exists too much duplicated code now. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15852832#comment-15852832 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2768 In fact, my original suggestion was to add a new interface `ReadableState`: ``` interface ReadableState { T get(); } ``` and leave all the existing interfaces as they are. That way we would have the least amount of changes in existing code and still have a common interface for state that can be read. (Note that `ReadableState` does not extends `State` on purpose, because of `State.clear()`.) > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15852487#comment-15852487 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @StephanEwen Thanks a lot for your comments. **Removing `clear()` from `State`** This change is suggested by @aljoscha who wants to let broadcast states share the same interface (see the discussion in [FLINK-5023](https://issues.apache.org/jira/browse/FLINK-5023)) . As mentioned, the broadcast states are read-only in some cases. Hence it's suggested not to provide the `clear()` method in the base `State` interface. **Changing the `State` interface** The `State` interface is typed because I want to provide the `get()` method for all states so that we can retrieve the data in the state (under the current key for keyed states). The functionality is already provided by all states except `ValueState` who provides the same functionality with the `value()` method. Providing the method for all states can help reduce some duplicated code in the implementation. It also makes sense for read-only states mentioned above. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15840004#comment-15840004 ] ASF GitHub Bot commented on FLINK-5023: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2768 Update: Just saw that you already did the migration support. Will merge the `StateDescriptor` refactoring. Let's discuss the other two types of changes separately. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15837881#comment-15837881 ] ASF GitHub Bot commented on FLINK-5023: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2768 Hi @shixiaogang ! I went through this pull request and below are a few thoughts. The pull request changes many things together. Some can work, and for others I would suggest to do it differently. Let me know what you think about this: ## Changing the `StateDescriptor` This is probably a good cleanup. It does currently break the backwards compatibility, because Flink 1.1 wrote the state descriptors into the checkpoints. Flink 1.2 and 1.3 do not do that any more, but currently rely on the unchanged StateDescriptor classes for resuming Flink-1.1-savepoints. We added a way to define "migration" classes, meaning we can store the old StateDescriptor classes in a migration package where they are dynamically loaded only as proxies when resuming a Flink-1.1 savepoint. That way we can change the classes and maintain backwards compatibility. ## Removing `clear()` from`State` I think it would be nice to keep `clear()` on the base `State` interface. Can you explain why you want to remove it? In my opinion, every state needs to be able to clear, so it makes sense to have this on the case interface. If this is in preparation for the `MapState`, then the `MapState` can simply override the `clear()` method with different logic. - I think that the MapState needs to support clear as well, where it deletes the sub-map for the current key. - On the HeapStateBackend, this is quite easy, when we assume that each (key/namespace) has a map as the value (and the complete map can be dropped) - For the RocksDBStateBackend, it is a bit more expensive, and would correspond to a range-iteration-and-deletes. If an application decides that the MapState clear() is to expensive, it can decide to not call it. But we should still support it for cases where it is necessary. ## Changing the `State` interface I would like to not change `State` to `State` in the Flink master now, because it cases warnings in all parts of the code (that suddenly use raw types) and for some user programs. While this would be done for Flink 2.0, it would make merging simpler if we don't change it now. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827234#comment-15827234 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 Despite the changes in the state descriptors, the Flink jobs can restore from old versioned snapshots now. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711289#comment-15711289 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 I rebased the branch to resolve the conflicts with the master branch. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15701547#comment-15701547 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2768 Thanks for updating, @shixiaogang. I'll look at this today! For the change to reducing state it would be better to have this as a separate issue/commit/PR. @fhueske you already opened an issue for this, right? Could you please point us to that? > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15701513#comment-15701513 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 I moved default value from `SimpleStateDescriptor` to `ValueStateDescriptor`. Now only `ValueStateDescriptor`s have default values. The serialization methods may contain some duplicated code, but i think it's acceptable. I also modify the implementation of `HeapReducingState`s. The first value will be copied before being put into the heap. @aljoscha What do you think of these changes? > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15692853#comment-15692853 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2768 @shixiaogang you can also do that, but then the machinery in `SimpleStateDescriptor` is only used by `ValueStateDescriptor` because this is really the only descriptor that has a default value. (In my original implementation it was like this and only `ValueStateDescriptor` had the serialisation logic and a default value.) > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15692636#comment-15692636 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 Oh... I added another field to make the code more clear, but I did not notice the serialization problem. Thanks very much for your reminder. Your solution does work though the concept of "defaultValue" in folding states is a little confusing. Another solution to let `FoldingStateDescriptor` implement its own serialization method. And I prefer this one. What do you think? @aljoscha > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689795#comment-15689795 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2768 @shixiaogang Thanks for the swift update! The changes look very good now though I found one last thing that could be problematic. This is changing the behaviour of `FoldingState` when it comes to the default value. I'll quickly open a PR that verifies the current behaviour. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15688809#comment-15688809 ] ASF GitHub Bot commented on FLINK-5023: --- Github user shixiaogang commented on the issue: https://github.com/apache/flink/pull/2768 @aljoscha Thanks for your review. I have updated the PR according to your suggestion. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687290#comment-15687290 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r89128914 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/SimpleStateDescriptor.java --- @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import static java.util.Objects.requireNonNull; + +/** + * Base class for the descriptors of simple states. A {@code SimpleStateDescriptor} is used + * for creating partitioned simple states whose values are not composited. + * + * Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}. + * + * @param The type of the value in the state. + * @param The type of the created state. + */ +@PublicEvolving +public abstract class SimpleStateDescriptor> extends StateDescriptor { + private static final long serialVersionUID = 1L; + + /** The serializer for the type. May be eagerly initialized in the constructor, +* or lazily once the type is serialized or an ExecutionConfig is provided. */ + protected TypeSerializer typeSerializer; + + /** The type information describing the value type. Only used to lazily create the serializer +* and dropped during serialization */ + private transient TypeInformation typeInfo; + + /** The default value returned by the state when no other value is bound to a key */ + protected transient T defaultValue; --- End diff -- We might think about renaming this because for `FoldingState` it's not a default value but the initial accumulation value. (Just a suggestion, not strictly necessary) > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687291#comment-15687291 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r89162070 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java --- @@ -179,12 +179,13 @@ public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception { * * @param stateDescriptor The StateDescriptor that contains the name and type of the *state that is being accessed. +* @param The type of the values in the state. * @param The type of the state. * @return The partitioned state object. * @throws UnsupportedOperationException Thrown, if no partitioned state is available for the * function (function is not part os a KeyedStream). */ -S getPartitionedState(StateDescriptor stateDescriptor); + > S getPartitionedState(StateDescriptor stateDescriptor); --- End diff -- Same thing as elsewhere: we don't need `V`. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687293#comment-15687293 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r89160249 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java --- @@ -69,25 +69,26 @@ * @param stateDescriptor The identifier for the state. This contains name and can create a default state value. * @param The type of the namespace. +* @param The type of the values in the state. * @param The type of the state. * * @return A new key/value state backed by this backend. * * @throws Exception Exceptions may occur during initialization of the state and should be forwarded. */ @SuppressWarnings({"rawtypes", "unchecked"}) -S getPartitionedState( + > S getPartitionedState( --- End diff -- I think we don't need the additional `V` parameter. It's never used and `State` works just as while while `State` just pretends to add more type safety. Because both ways work I would prefer to go with the solution that doesn't add additional generic parameters to methods. Same holds for `mergePartitionedStates()`. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687294#comment-15687294 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r89129638 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/ValueState.java --- @@ -37,30 +37,24 @@ * @param Type of the value in the state. */ @PublicEvolving -public interface ValueState extends State { - +public interface ValueState extends UpdatableState { /** -* Returns the current value for the state. When the state is not -* partitioned the returned value is the same for all inputs in a given -* operator instance. If state partitioning is applied, the value returned -* depends on the current operator input, as the operator maintains an -* independent state for each partition. -* -* @return The operator state value corresponding to the current input. -* +* Returns the current value for the state. The method performs the same +* functionality as {@link State#get()}. +* * @throws IOException Thrown if the system cannot access the state. */ + @Deprecated --- End diff -- Please also add a `@deprecated` description in the Javadoc, pointing to use `get()`. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687289#comment-15687289 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r89161821 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java --- @@ -487,7 +487,7 @@ protected ProcessingTimeService getProcessingTimeService() { * @throws IllegalStateException Thrown, if the key/value state was already initialized. * @throws Exception Thrown, if the state backend cannot create the key/value state. */ - protected S getPartitionedState(StateDescriptor stateDescriptor) throws Exception { + protected > S getPartitionedState(StateDescriptor stateDescriptor) throws Exception { --- End diff -- Here, the same thing I mentioned on `KeyedStateBackend` holds, we don't need the additional `V` parameter. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687292#comment-15687292 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r89161552 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/query/AbstractQueryableStateOperator.java --- @@ -28,16 +28,17 @@ /** * Internal operator handling queryable state instances (setup and update). * + * @param Value type * @param State type * @param Input type */ @Internal -abstract class AbstractQueryableStateOperator +abstract class AbstractQueryableStateOperator, IN> --- End diff -- I think here the same thing I mentioned earlier holds, we don't really need the additional `V` parameter. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687288#comment-15687288 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r89129366 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/State.java --- @@ -20,19 +20,30 @@ import org.apache.flink.annotation.PublicEvolving; +import java.io.IOException; + /** * Interface that different types of partitioned state must implement. * * The state is only accessible by functions applied on a KeyedDataStream. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together. + * + * @param The type of the values in the state. */ @PublicEvolving -public interface State { - +public interface State { /** -* Removes the value mapped under the current key. +* Returns the current value for the state. When the state is not +* partitioned the returned value is the same for all inputs in a given +* operator instance. If state partitioning is applied, the value returned +* depends on the current operator input, as the operator maintains an +* independent state for each partition. --- End diff -- This is incorrect, we don't maintain state by partition but by key. (This is not your fault, it was always like this on `ValueState` and you copied it from there, I just discovered it now when reading your code.) > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15687295#comment-15687295 ] ASF GitHub Bot commented on FLINK-5023: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r89130528 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapSimpleState.java --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.heap; + +import org.apache.flink.api.common.state.SimpleStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.UpdatableState; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.util.Preconditions; + +import java.util.Map; + +/** + * Heap-backed partitioned states whose values are not composited and is snapshotted into files. --- End diff -- the `is snapshotted into files` portion is not always true, and also not necessary here. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15653177#comment-15653177 ] Xiaogang Shi commented on FLINK-5023: - [~aljoscha] [~StephanEwen] I have updated the PR. Now, `State` only provides a read-only accessor and a new interface called `UpdatableState` is added. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15649615#comment-15649615 ] Xiaogang Shi commented on FLINK-5023: - Aljoscha Krettek In that case, i think we should provide {{UpdatableState}} instead of {{ReadableState}}. Now {{State}} is updatable because it has the method {{clear()}}. If {{ReadableState}} does not inherit from {{State}}. I would prefer {{ReadableState}} inherits from {{State}}; otherwise, it's very weird that a readable state is not a state. >From this point, i think the base {{State}} should be readable, and a new >interface {{UpdatableState}} could be provided to abstract those states >allowing both reads and writes. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648122#comment-15648122 ] Aljoscha Krettek commented on FLINK-5023: - [~StephanEwen] [~xiaogang.shi] In fact, I would prefer if we went one step further and have {{ReadableState}} like this: {code} interface ReadableState { T get(); } {code} {{ValueState}} would then be {code} public interface ValueState extends State, ReadableState, OperatorState { // OperatorState is here for backwards compatibility ... } {code} I want a clear separation of capabilities of state accessors because I think that this might become relevant in the future. I even have a concrete use case right now: for FLINK-4940/FLINK-3659 I want to reuse the state interfaces because then users can use the same known state types for interacting with broadcast/global state that they use for accessing keyed state. In that case, users should be allowed read/write access when processing data from the broadcast side while they should only have read access when processing data from non-broadcast inputs (otherwise, the assumption that broadcast state is the same on all parallel subtask instances of an operator can be violated.). For this, users would only be allowed to get a {{ReadableState}} for read access and a full {{ValueState}} (or {{ListState}} or {{ReducingState}}...) for read/write access. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15647572#comment-15647572 ] ASF GitHub Bot commented on FLINK-5023: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r86987978 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/state/ListStateDescriptor.java --- @@ -84,21 +138,37 @@ public boolean equals(Object o) { ListStateDescriptor that = (ListStateDescriptor) o; - return serializer.equals(that.serializer) && name.equals(that.name); - + return elemTypeSerializer.equals(that.elemTypeSerializer) && name.equals(that.name); } @Override public int hashCode() { - int result = serializer.hashCode(); + int result = elemTypeSerializer.hashCode(); result = 31 * result + name.hashCode(); return result; } @Override public String toString() { return "ListStateDescriptor{" + - "serializer=" + serializer + + "elem serializer=" + elemTypeSerializer + '}'; } + + // + // Serialization + // + + private void writeObject(final ObjectOutputStream out) throws IOException { + // make sure we have a serializer before the type information gets lost + initializeSerializerUnlessSet(new ExecutionConfig()); + + // write all the non-transient fields + out.defaultWriteObject(); + } + + private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { --- End diff -- I think this method is not necessary, because it only triggers default serialization anyways. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15647567#comment-15647567 ] ASF GitHub Bot commented on FLINK-5023: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2768#discussion_r86987689 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSimpleState.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state; + +import org.apache.flink.api.common.state.SimpleStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.util.Preconditions; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteOptions; + +import java.io.ByteArrayInputStream; +import java.io.IOException; + +/** + * The implementation that stores simple states in RocksDB. + * + * @param The type of the key. + * @param The type of the namespace. + * @param The type of value that the state state stores. + */ +public class RocksDBSimpleState extends AbstractRocksDBState implements State { + + /** State descriptor from which to create this state instance */ + protected final SimpleStateDescriptor> stateDesc; + + /** Serializer for the values */ + protected final TypeSerializer valueSerializer; + + /** +* We disable writes to the write-ahead-log here. We can't have these in the base class +* because JNI segfaults for some reason if they are. +*/ + protected final WriteOptions writeOptions; + + /** +* Creates a new {@code RocksDBSimpleState}. +* +* @param namespaceSerializer The serializer for the namespace. +* @param stateDesc The state identifier for the state. This contains name and can create a default state value. +*/ + public RocksDBSimpleState(ColumnFamilyHandle columnFamily, + TypeSerializer namespaceSerializer, + SimpleStateDescriptor> stateDesc, + RocksDBKeyedStateBackend backend + ) { + super(columnFamily, namespaceSerializer, backend); + + this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor"); + this.valueSerializer = stateDesc.getSerializer(); + + writeOptions = new WriteOptions(); + writeOptions.setDisableWAL(true); + } + + @Override + public V get() { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); + byte[] valueBytes = backend.db.get(columnFamily, key); + if (valueBytes == null) { + return stateDesc.getDefaultValue(); + } + return valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes))); --- End diff -- Given that this is used only in single threaded settings, we should try to reuse the `DataInputViewStreamWrapper` and `ByteArrayInputStream`. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi >Assignee: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or m
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646299#comment-15646299 ] Xiaogang Shi commented on FLINK-5023: - I have opened a PR: https://github.com/apache/flink/pull/2768/files. Since the implementation of `State` and `StateDescriptor` is closely connected, i also put the code of FLINK-5024 in this PR. Existing code may be affected by the changes in `StateDescriptor` because now `StateDescriptor` only accept one type argument. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646286#comment-15646286 ] ASF GitHub Bot commented on FLINK-5023: --- GitHub user shixiaogang opened a pull request: https://github.com/apache/flink/pull/2768 [FLINK-5023 & FLINK-5024] Add SimpleStateDescriptor to clarify the concepts Changes in the definition of `State` and `StateDescriptor`: - Add `get()` in the `State` interface. - Remove type serializers of state values from `StateDescriptor`s. - Add `SimpleStateDescriptor` to simplify the construction of `ValueStateDescriptor`, `ReducingStateDescriptor` and `FoldingStateDescriptor`. - Changes the definition of `KeyedStateBackend` and `AbstractKeyedStateBackend` accordingly. - Modify the implementation of `ListStateDescriptor` accordingly. Changes in HeapStateBackend: - Let `AbstractHeapState` not implement the `State` interface. The `clear()` method now is removed from `AbstractHeapState`. - Add `HeapSimpleState` to simplify the implementation of `HeapValueState`, `HeapReducingState` and `HeapFoldingState`. - Change the implementation of `HeapValueState`, `HeapReducingState` and `HeapFoldingState` accordingly. Changes in RocksDBStateBackend: - Let `AbstractRocksDBState` not implement the `State` interface, removing the `clear()` method. Now, `AbstractRocksDBState` does not depend on the types of `State` and `StateDescriptor` any more. - Add `RocksDBSimpleState` to simplify the implementation of `RocksDBValueState`, `RocksDBReducingState` and `RocksDBFoldingState`. - Change the implementation of `RocksDBValueState`, `RocksDBReducingState` and `RocksDBFoldingState` accordingly. Others: - Update the usage of `State`s in the implementation of window operators. - Update the usage of `State`s in unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alibaba/flink flink-5023 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2768.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2768 commit 007a93b454e693bc3662f540ac5f33e899ce9058 Author: xiaogang.sxg Date: 2016-11-08T02:38:22Z Refactor the interface of State and StateDescriptor > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15644451#comment-15644451 ] Stephan Ewen commented on FLINK-5023: - I agree with [~xiaogang.shi] - do we really need another interface? Can we not just augment the {{State}} interface? This would only be API breaking if we expect users to implement state interfaces themselves, which we do not, I think. > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643925#comment-15643925 ] Xiaogang Shi commented on FLINK-5023: - The only old method affected is the `value()` method in ValueState. All other states have already implemented the `get()` method. We can implement `ValueState#value()` by wrapping the `get()` method to avoid any changes to existing code. The introduction of ReadableState works. But I think the additional interfaces will make the code "verbose" :) > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-5023) Add get() method in State interface
[ https://issues.apache.org/jira/browse/FLINK-5023?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15643720#comment-15643720 ] Aljoscha Krettek commented on FLINK-5023: - I also thought about this. We shouldn't remove the old methods, though, so as to not break existing code. What I wanted to do is add a new interface {{ReadableState}} like this: {code} interface ReadableState extends State { T get(); } {code} What do you think? > Add get() method in State interface > --- > > Key: FLINK-5023 > URL: https://issues.apache.org/jira/browse/FLINK-5023 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Xiaogang Shi > > Currently, the only method provided by the State interface is `clear()`. I > think we should provide another method called `get()` to return the > structured value (e.g., value, list, or map) under the current key. > In fact, the functionality of `get()` has already been implemented in all > types of states: e.g., `value()` in ValueState and `get()` in ListState. The > modification to the interface can better abstract these states. -- This message was sent by Atlassian JIRA (v6.3.4#6332)