GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2512
Partitionable op state This pull request introduces rescalable non-partitioned operator state as described in issue [FLINK-4379] and makes following major changes: # 1) Introducing interfaces `PartitionableStateBackend` and `SnapshotProvider` For rescaling purposes, non-partitioned state is now stored in a `PartitionableStateBackend`, which provides a method to register operator states under unique names and store state partitions in a list state: ``` <S> ListState<S> getPartitionableState( String name, Collection<PartitionableOperatorStateHandle> restoreSnapshots, TypeSerializer<S> partitionStateSerializer) throws Exception; ``` Notice that a `TypeSerializer` is provided on a per-state basis so that we can realize backward compatibility if the serialization format changes. Furthermore, we introduce `SnapshotProvider` which offers a method to create snapshots: ``` RunnableFuture<S> snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception; ``` `DefaultPartitionableStateBackend` is an implementation of both, `PartitionableStateBackend` and `SnapshotProvider`, where the first interface gives a view that is exposed to user code and the later interface is only used by the system to trigger snapshots. Similar to other state backends (e.g. `KeyedStateBackend`), a `DefaultPartitionableStateBackend` can be created and restored through the class `AbstractStateBackend`: ``` public DefaultPartitionableStateBackend createPartitionableStateBackend( Environment env, String operatorIdentifier) public DefaultPartitionableStateBackend restorePartitionableStateBackend( Environment env, String operatorIdentifier, Collection<PartitionableOperatorStateHandle> restoreSnapshots) ``` # 2) Interface `PartitionableCheckpointed` as replacement for `Checkpointed` User code can use rescalable non-partitioned state by implementing the `PartitionableCheckpointed` interface. This interface is meant to replace the `Checkpointed` interface, which would become deprecated. ``` void storeOperatorState(long checkpointId, PartitionableStateBackend backend) throws Exception; void restoreOperatorState(PartitionableStateBackend backend) throws Exception; ``` Through the store/restore methods, user code can interact with `PartitionableStateBackend` and register/restore states as previously explained. Methods from this interface are invoked in `StreamTask::performCheckpoint(...)` and `StreamTask::restoreState()` respectively. # 3) Interface `StateRepartitioner` This interface allows implementations of repartitioning strategies the redistribute non-partitioned state when for changing parallelism. Currently, there is only one default strategy implemented in `RoundRobinStatePartitioner`, but in general it is possible to implement different strategies, e.g. a `StatePartitioner` that distributes the union of all previous states parallel subtasks to each parallel subtask. This interface is used in `CheckpointCoordinator::restoreLatestCheckpointedState()`. TODO: After review, a description of the new features must still be added to the Flink documentation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink partitionable-op-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2512.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2512 ---- commit d133b948b8a201178e4ad3afa49e715bebf8eb5f Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-31T21:59:27Z State backend infrastructure to support partitionable op state. Introducing CheckpointStateHandles as container for all checkpoint related state handles tmp repartitioning function wip commit 7213938be4bd706b57128c49cb24b24450ea1c87 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-07T16:52:48Z Add task chain length to TaskState commit b639c8c62f64f840b95cd6191d7adf514eecddc6 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-08T21:11:11Z Forward partitionable operator state to PartitionableCheckpointed operators commit 090d27f6daa173b43f1c5d2ee19cc661359b925a Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-08T22:05:13Z Make Kafka sources re-partitionable by using the partitionable non-keyed state Allow Kafka sources to use repartitionable state Remove CheckpointedAsynchronously from FlinkKafkaConsumerBase commit bc964d7ad6c1608d8bae0db67efdb70c4c278086 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-09T13:34:38Z Make Kafka producer re-partitionable by removing the Checkpointed interface commit 2366feeff842c3db39625da5a5912a98e83692db Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-09-09T17:02:32Z Fix HeapKeyedStateBackend.restorePartitionedState commit e71cd8629ac1f195a53749d3303a7acf3cc4e4dd Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-15T16:42:55Z Partitionable State temporary WIP commit e84998d2702b9461f59df14c5aa1bb8d7438457a Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-16T12:22:47Z Introduce partitionable state in savepoint serializer commit a50629cb8eb876dfcd1af2199e4f4c08bc8562ad Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-16T12:37:34Z Using known chain length to replace maps with list of known size commit 2921b478cbc9cadc2c205d8ee66c9f66c11245f1 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-16T17:56:02Z Added test case for rescaling partitionable state commit 26b48fedc648715e0ed3e5a4642bede91c86ff02 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-09-19T10:16:26Z Code cleanup, minor refinements and documentation ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---