http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/checkpointing.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state/checkpointing.md b/docs/dev/stream/state/checkpointing.md new file mode 100644 index 0000000..34c16b0 --- /dev/null +++ b/docs/dev/stream/state/checkpointing.md @@ -0,0 +1,174 @@ +--- +title: "Checkpointing" +nav-parent_id: streaming_state +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* ToC +{:toc} + +Every function and operator in Flink can be **stateful** (see [working with state](state.html) for details). +Stateful functions store data across the processing of individual elements/events, making state a critical building block for +any type of more elaborate operation. + +In order to make state fault tolerant, Flink needs to **checkpoint** the state. Checkpoints allow Flink to recover state and positions +in the streams to give the application the same semantics as a failure-free execution. + +The [documentation on streaming fault tolerance](../../../internals/stream_checkpointing.html) describes in detail the technique behind Flink's streaming fault tolerance mechanism. + + +## Prerequisites + +Flink's checkpointing mechanism interacts with durable storage for streams and state. In general, it requires: + + - A *persistent* (or *durable*) data source that can replay records for a certain amount of time. Examples for such sources are persistent messages queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...). + - A persistent storage for state, typically a distributed filesystem (e.g., HDFS, S3, GFS, NFS, Ceph, ...) + + +## Enabling and Configuring Checkpointing + +By default, checkpointing is disabled. To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds. + +Other parameters for checkpointing include: + + - *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels. + Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications. + + - *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete by then. + + - *minimum time between checkpoints*: To make sure that the streaming application makes a certain amount of progress between checkpoints, + one can define how much time needs to pass between checkpoints. If this value is set for example to *5000*, the next checkpoint will be + started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. + Note that this implies that the checkpoint interval will never be smaller than this parameter. + + It is often easier to configure applications by defining the "time between checkpoints" then the checkpoint interval, because the "time between checkpoints" + is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow). + + Note that this value also implies that the number of concurrent checkpoints is *one*. + + - *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. + This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. + It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay + (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints + (100s of milliseconds) to re-process very little upon failures. + + This option cannot be used when a minimum time between checkpoints is defined. + + - *externalized checkpoints*: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are *not* automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails. There are more details in the [deployment notes on externalized checkpoints](../../ops/state/checkpoints.html#externalized-checkpoints). + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// start a checkpoint every 1000 ms +env.enableCheckpointing(1000); + +// advanced options: + +// set mode to exactly-once (this is the default) +env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + +// make sure 500 ms of progress happen between checkpoints +env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); + +// checkpoints have to complete within one minute, or are discarded +env.getCheckpointConfig().setCheckpointTimeout(60000); + +// allow only one checkpoint to be in progress at the same time +env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); + +// enable externalized checkpoints which are retained after job cancellation +env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +// start a checkpoint every 1000 ms +env.enableCheckpointing(1000) + +// advanced options: + +// set mode to exactly-once (this is the default) +env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) + +// make sure 500 ms of progress happen between checkpoints +env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) + +// checkpoints have to complete within one minute, or are discarded +env.getCheckpointConfig.setCheckpointTimeout(60000) + +// allow only one checkpoint to be in progress at the same time +env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) +{% endhighlight %} +</div> +</div> + +### Related Config Options + +Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` (see [configuration]({{ site.baseurl }}/ops/config.html) for a full guide): + +- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends: + - `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging. + - `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ... + +- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. + +- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example â:â (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`) + +- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints]({{ site.baseurl }}/ops/state/checkpoints.html#externalized-checkpoints). + +- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1) + +{% top %} + + +## Selecting a State Backend + +Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html) stores consistent snapshots +of all the state in timers and stateful operators, including connectors, windows, and any [user-defined state](state.html). +Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured +**State Backend**. + +By default, state is kept in memory in the TaskManagers and checkpoints are stored in memory in the JobManager. For proper persistence of large state, +Flink supports various approaches for storing and checkpointing state in other state backends. The choice of state backend can be configured via `StreamExecutionEnvironment.setStateBackend(â¦)`. + +See [state backends]({{ site.baseurl }}/ops/state/state_backends.html) for more details on the available state backends and options for job-wide and cluster-wide configuration. + + +## State Checkpoints in Iterative Jobs + +Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`. + +Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure. + +{% top %} + + +## Restart Strategies + +Flink supports different restart strategies which control how the jobs are restarted in case of a failure. For more +information, see [Restart Strategies]({{ site.baseurl }}/dev/restart_strategies.html). + +{% top %} +
http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/custom_serialization.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state/custom_serialization.md b/docs/dev/stream/state/custom_serialization.md new file mode 100644 index 0000000..fbb7b83 --- /dev/null +++ b/docs/dev/stream/state/custom_serialization.md @@ -0,0 +1,188 @@ +--- +title: "Custom Serialization for Managed State" +nav-title: "Custom Serialization" +nav-parent_id: streaming_state +nav-pos: 10 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +If your application uses Flink's managed state, it might be necessary to implement custom serialization logic for special use cases. + +This page is targeted as a guideline for users who require the use of custom serialization for their state, covering how +to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using +Flink's own serializers, this page is irrelevant and can be skipped. + +### Using custom serializers + +As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required +to specify the state's name, as well as information about the type of the state. The type information is used by Flink's +[type serialization framework](../../types_serialization.html) to create appropriate serializers for the state. + +It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, +simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...}; + +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "state-name", + new CustomTypeSerializer()); + +checkpointedState = getRuntimeContext().getListState(descriptor); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...} + +val descriptor = new ListStateDescriptor[(String, Integer)]( + "state-name", + new CustomTypeSerializer) +) + +checkpointedState = getRuntimeContext.getListState(descriptor); +{% endhighlight %} +</div> +</div> + +Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following +subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using +anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, +which varies across compilers and depends on the order that they are instantiated within the enclosing class, which can +easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the +classpath). + +### Handling serializer upgrades and compatibility + +Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any +specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer +that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, +and is replaced as the new serializer for the state. + +A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, +and the written binary format of the state also remains identical. The means to check the new serializer's compatibility +is provided through the following two methods of the `TypeSerializer` interface: + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); +public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); +{% endhighlight %} + +Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a +point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the +checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot +will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify +compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, +as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. + +Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the +same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible +with their previous configuration. + +The following subsections illustrate guidelines to implement these two methods when using custom serializers. + +#### Implementing the `snapshotConfiguration` method + +The serializer's configuration snapshot should capture enough information such that on restore, the information +carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. +This could typically contain information about the serializer's parameters or binary format of the serialized data; +generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized +bytes, and that it writes in the same binary format. + +How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below +is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. + +{% highlight java %} +public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { + public abstract int getVersion(); + public void read(DataInputView in) {...} + public void write(DataOutputView out) {...} +} +{% endhighlight %} + +The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base +implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and +not completely overridden. + +The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer +configuration snapshot is the means to maintain compatible configurations, as information included in the configuration +may change over time. By default, configuration snapshots are only compatible with the current version (as returned by +`getVersion`). To indicate that the configuration is compatible with other versions, override the `getCompatibleVersions` +method to return more version values. When reading from the checkpoint, you can use the `getReadVersion` method to +determine the version of the written configuration and adapt the read logic to the specific version. + +<span class="label label-danger">Attention</span> The version of the serializer's configuration snapshot is **not** +related to upgrading the serializer. The exact same serializer can have different implementations of its +configuration snapshot, for example when more information is added to the configuration to allow more comprehensive +compatibility checks in the future. + +One limitation of implementing a `TypeSerializerConfigSnapshot` is that an empty constructor must be present. The empty +constructor is required when reading the configuration snapshot from checkpoints. + +#### Implementing the `ensureCompatibility` method + +The `ensureCompatibility` method should contain logic that performs checks against the information about the previous +serializer carried over via the provided `TypeSerializerConfigSnapshot`, basically doing one of the following: + + * Check whether the serializer is compatible, while possibly reconfiguring itself (if required) so that it may be + compatible. Afterwards, acknowledge with Flink that the serializer is compatible. + + * Acknowledge that the serializer is incompatible and that state migration is required before Flink can proceed with + using the new serializer. + +The above cases can be translated to code by returning one of the following from the `ensureCompatibility` method: + + * **`CompatibilityResult.compatible()`**: This acknowledges that the new serializer is compatible, or has been reconfigured to + be compatible, and Flink can proceed with the job with the serializer as is. + + * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the serializer is incompatible, or cannot be + reconfigured to be compatible, and requires a state migration before the new serializer can be used. State migration + is performed by using the previous serializer to read the restored state bytes to objects, and then serialized again + using the new serializer. + + * **`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`**: This acknowledgement has equivalent semantics + to `CompatibilityResult.requiresMigration()`, but in the case that the previous serializer cannot be found or loaded + to read the restored state bytes for the migration, a provided `TypeDeserializer` can be used as a fallback resort. + +<span class="label label-danger">Attention</span> Currently, as of Flink 1.3, if the result of the compatibility check +acknowledges that state migration needs to be performed, the job simply fails to restore from the checkpoint as state +migration is currently not available. The ability to migrate state will be introduced in future releases. + +### Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code + +Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as part of checkpoints along with the state +values, the availability of the classes within the classpath may affect restore behaviour. + +`TypeSerializer`s are directly written into checkpoints using Java Object Serialization. In the case that the new +serializer acknowledges that it is incompatible and requires state migration, it will be required to be present to be +able to read the restored state bytes. Therefore, if the original serializer class no longer exists or has been modified +(resulting in a different `serialVersionUID`) as a result of a serializer upgrade for the state, the restore would +not be able to proceed. The alternative to this requirement is to provide a fallback `TypeDeserializer` when +acknowledging that state migration is required, using `CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`. + +The class of `TypeSerializerConfigSnapshot`s in the restored checkpoint must exist in the classpath, as they are +fundamental components to compatibility checks on upgraded serializers and would not be able to be restored if the class +is not present. Since configuration snapshots are written to checkpoints using custom serialization, the implementation +of the class is free to be changed, as long as compatibility of the configuration change is handled using the versioning +mechanisms in `TypeSerializerConfigSnapshot`. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/index.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state/index.md b/docs/dev/stream/state/index.md new file mode 100644 index 0000000..d1c0edb --- /dev/null +++ b/docs/dev/stream/state/index.md @@ -0,0 +1,56 @@ + --- +title: "State & Fault Tolerance" +nav-id: streaming_state +nav-title: "State & Fault Tolerance" +nav-parent_id: streaming +nav-pos: 3 +nav-show_overview: true +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for +any type of more elaborate operation. + +For example: + + - When an application searches for certain event patterns, the state will store the sequence of events encountered so far. + - When aggregating events per minute/hour/day, the state holds the pending aggregates. + - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. + - When historic data needs to be managed, the state allows efficient access to events occured in the past. + +Flink needs to be aware of the state in order to make state fault tolerant using [checkpoints](checkpointing.html) and to allow [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) of streaming applications. + +Knowledge about the state also allows for rescaling Flink applications, meaning that Flink takes care of redistributing state across parallel instances. + +The [queryable state](queryable_state.html) feature of Flink allows you to access state from outside of Flink during runtime. + +When working with state, it might also be useful to read about [Flink's state backends]({{ site.baseurl }}/ops/state/state_backends.html). Flink provides different state backends that specify how and where state is stored. State can be located on Java's heap or off-heap. Depending on your state backend, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. State backends can be configured without changing your application logic. + +{% top %} + +Where to go next? +----------------- + +* [Working with State](state.html): Shows how to use state in a Flink application and explains the different kinds of state. +* [Checkpointing](checkpointing.html): Describes how to enable and configure checkpointing for fault tolerance. +* [Queryable State](queryable_state.html): Explains how to access state from outside of Flink during runtime. +* [Custom Serialization for Managed State](custom_serialization.html): Discusses custom serialization logic for state and its upgrades. + +{% top %} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/queryable_state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state/queryable_state.md b/docs/dev/stream/state/queryable_state.md new file mode 100644 index 0000000..bd0d7fb --- /dev/null +++ b/docs/dev/stream/state/queryable_state.md @@ -0,0 +1,292 @@ +--- +title: "Queryable State" +nav-parent_id: streaming_state +nav-pos: 3 +is_beta: true +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* ToC +{:toc} + +<div class="alert alert-warning"> + <strong>Note:</strong> The client APIs for queryable state are currently in an evolving state and + there are <strong>no guarantees</strong> made about stability of the provided interfaces. It is + likely that there will be breaking API changes on the client side in the upcoming Flink versions. +</div> + +In a nutshell, this feature allows users to query Flink's managed partitioned state +(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of +Flink. For some scenarios, queryable state thus eliminates the need for distributed +operations/transactions with external systems such as key-value stores which are often the +bottleneck in practice. + +<div class="alert alert-warning"> + <strong>Attention:</strong> Queryable state accesses keyed state from a concurrent thread rather + than synchronizing with the operator and potentially blocking its operation. Since any state + backend using Java heap space, e.g. MemoryStateBackend or + FsStateBackend, does not work with copies when retrieving values but instead directly + references the stored values, read-modify-write patterns are unsafe and may cause the + queryable state server to fail due to concurrent modifications. + The RocksDBStateBackend is safe from these issues. +</div> + +## Making State Queryable + +In order to make state queryable, the queryable state server first needs to be enabled globally +by setting the `query.server.enable` configuration parameter to `true` (current default). +Then appropriate state needs to be made queryable by using either + +* a `QueryableStateStream`, a convenience object which behaves like a sink and offers incoming values as +queryable state, or +* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the keyed state of an +operator queryable. + +The following sections explain the use of these two approaches. + +### Queryable State Stream + +A `KeyedStream` may offer its values as queryable state by using the following methods: + +{% highlight java %} +// ValueState +QueryableStateStream asQueryableState( + String queryableStateName, + ValueStateDescriptor stateDescriptor) + +// Shortcut for explicit ValueStateDescriptor variant +QueryableStateStream asQueryableState(String queryableStateName) + +// FoldingState +QueryableStateStream asQueryableState( + String queryableStateName, + FoldingStateDescriptor stateDescriptor) + +// ReducingState +QueryableStateStream asQueryableState( + String queryableStateName, + ReducingStateDescriptor stateDescriptor) +{% endhighlight %} + + +<div class="alert alert-info"> + <strong>Note:</strong> There is no queryable <code>ListState</code> sink as it would result in an ever-growing + list which may not be cleaned up and thus will eventually consume too much memory. +</div> + +A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and +currently only holds the name as well as the value and key serializer for the queryable state +stream. It is comparable to a sink, and cannot be followed by further transformations. + +Internally a `QueryableStateStream` gets translated to an operator which uses all incoming +records to update the queryable state instance. +In a program like the following, all records of the keyed stream will be used to update the state +instance, either via `ValueState#update(value)` or `AppendingState#add(value)`, depending on +the chosen state variant: +{% highlight java %} +stream.keyBy(0).asQueryableState("query-name") +{% endhighlight %} +This acts like the Scala API's `flatMapWithState`. + +### Managed Keyed State + +Managed keyed state of an operator +(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state)) +can be made queryable by making the appropriate state descriptor queryable via +`StateDescriptor#setQueryable(String queryableStateName)`, as in the example below: +{% highlight java %} +ValueStateDescriptor<Tuple2<Long, Long>> descriptor = + new ValueStateDescriptor<>( + "average", // the state name + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information + Tuple2.of(0L, 0L)); // default value of the state, if nothing was set +descriptor.setQueryable("query-name"); // queryable state name +{% endhighlight %} +<div class="alert alert-info"> + <strong>Note:</strong> The `queryableStateName` parameter may be chosen arbitrarily and is only + used for queries. It does not have to be identical to the state's own name. +</div> + + +## Querying State + +The `QueryableStateClient` helper class may be used for queries against the `KvState` instances that +serve the state internally. It needs to be set up with a valid `JobManager` address and port and is +created as follows: + +{% highlight java %} +final Configuration config = new Configuration(); +config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress); +config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort); + +final HighAvailabilityServices highAvailabilityServices = + HighAvailabilityServicesUtils.createHighAvailabilityServices( + config, + Executors.newSingleThreadScheduledExecutor(), + HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); + +QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); +{% endhighlight %} + +The query method is this: + +{% highlight java %} +Future<byte[]> getKvState( + JobID jobID, + String queryableStateName, + int keyHashCode, + byte[] serializedKeyAndNamespace) +{% endhighlight %} + +A call to this method returns a `Future` eventually holding the serialized state value for the +queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The +`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` and the +`serializedKeyAndNamespace` is the serialized key and namespace. +<div class="alert alert-info"> + <strong>Note:</strong> The client is asynchronous and can be shared by multiple threads. It needs + to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused in order to free + resources. +</div> + +The current implementation is still pretty low-level in the sense that it only works with +serialized data both for providing the key/namespace and the returned results. It is the +responsibility of the user (or some follow-up utilities) to set up the serializers for this. The +nice thing about this is that the query services don't have to get into the business of worrying +about any class loading issues etc. + +There are some serialization utils for key/namespace and value serialization included in +`KvStateRequestSerializer`. + +### Example + +The following example extends the `CountWindowAverage` example +(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state)) +by making it queryable and showing how to query this value: + +{% highlight java %} +public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum; + + @Override + public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { + Tuple2<Long, Long> currentSum = sum.value(); + currentSum.f0 += 1; + currentSum.f1 += input.f1; + sum.update(currentSum); + + if (currentSum.f0 >= 2) { + out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); + sum.clear(); + } + } + + @Override + public void open(Configuration config) { + ValueStateDescriptor<Tuple2<Long, Long>> descriptor = + new ValueStateDescriptor<>( + "average", // the state name + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information + Tuple2.of(0L, 0L)); // default value of the state, if nothing was set + descriptor.setQueryable("query-name"); + sum = getRuntimeContext().getState(descriptor); + } +} +{% endhighlight %} + +Once used in a job, you can retrieve the job ID and then query any key's current state from this operator: + +{% highlight java %} +final Configuration config = new Configuration(); +config.setString(JobManagerOptions.ADDRESS, queryAddress); +config.setInteger(JobManagerOptions.PORT, queryPort); + +final HighAvailabilityServices highAvailabilityServices = + HighAvailabilityServicesUtils.createHighAvailabilityServices( + config, + Executors.newSingleThreadScheduledExecutor(), + HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); + +QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); + +final TypeSerializer<Long> keySerializer = + TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig()); +final TypeSerializer<Tuple2<Long, Long>> valueSerializer = + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig()); + +final byte[] serializedKey = + KvStateRequestSerializer.serializeKeyAndNamespace( + key, keySerializer, + VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); + +Future<byte[]> serializedResult = + client.getKvState(jobId, "query-name", key.hashCode(), serializedKey); + +// now wait for the result and return it +final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS); +byte[] serializedValue = Await.result(serializedResult, duration); +Tuple2<Long, Long> value = + KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer); +{% endhighlight %} + +### Note for Scala Users + +Please use the available Scala extensions when creating the `TypeSerializer` instances. Add the following import: + +```scala +import org.apache.flink.streaming.api.scala._ +``` + +Now you can create the type serializers as follows: + +```scala +val keySerializer = createTypeInformation[Long] + .createSerializer(new ExecutionConfig) +``` + +If you don't do this, you can run into mismatches between the serializers used in the Flink job and in your client code, because types like `scala.Long` cannot be caputured at runtime. + +## Configuration + +The following configuration parameters influence the behaviour of the queryable state server and client. +They are defined in `QueryableStateOptions`. + +### Server +* `query.server.enable`: flag to indicate whether to start the queryable state server +* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick random available port) +* `query.server.network-threads`: number of network (event loop) threads for the `KvStateServer` (0 => #slots) +* `query.server.query-threads`: number of asynchronous query threads for the `KvStateServerHandler` (0 => #slots). + +### Client (`QueryableStateClient`) +* `query.client.network-threads`: number of network (event loop) threads for the `KvStateClient` (0 => number of available cores) +* `query.client.lookup.num-retries`: number of retries on location lookup failures +* `query.client.lookup.retry-delay`: retry delay on location lookup failures (millis) + +## Limitations + +* The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register +queryable state on startup and unregister it on disposal. In future versions, it is desirable to +decouple this in order to allow queries after a task finishes, and to speed up recovery via state +replication. +* Notifications about available KvState happen via a simple tell. In the future this should be improved to be +more robust with asks and acknowledgements. +* The server and client keep track of statistics for queries. These are currently disabled by +default as they would not be exposed anywhere. As soon as there is better support to publish these +numbers via the Metrics system, we should enable the stats. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md new file mode 100644 index 0000000..f280ceb --- /dev/null +++ b/docs/dev/stream/state/state.md @@ -0,0 +1,596 @@ +--- +title: "Working with State" +nav-parent_id: streaming_state +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +This document explains how to use Flink's state abstractions when developing an application. + +* ToC +{:toc} + +## Keyed State and Operator State + +There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. + +### Keyed State + +*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. + +You can think of Keyed State as Operator State that has been partitioned, +or sharded, with exactly one state-partition per key. +Each keyed-state is logically bound to a unique +composite of <parallel-operator-instance, key>, and since each key +"belongs" to exactly one parallel instance of a keyed operator, we can +think of this simply as <operator, key>. + +Keyed State is further organized into so-called *Key Groups*. Key Groups are the +atomic unit by which Flink can redistribute Keyed State; +there are exactly as many Key Groups as the defined maximum parallelism. +During execution each parallel instance of a keyed operator works with the keys +for one or more Key Groups. + +### Operator State + +With *Operator State* (or *non-keyed state*), each operator state is +bound to one parallel operator instance. +The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good motivating example for the use of Operator State +in Flink. Each parallel instance of the Kafka consumer maintains a map +of topic partitions and offsets as its Operator State. + +The Operator State interfaces support redistributing state among +parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution. + +## Raw and Managed State + +*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*. + +*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. +Examples are "ValueState", "ListState", etc. Flink's runtime encodes +the states and writes them into the checkpoints. + +*Raw State* is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into +the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes. + +All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. +Using managed state (rather than raw state) is recommended, since with +managed state Flink is able to automatically redistribute state when the parallelism is +changed, and also do better memory management. + +<span class="label label-danger">Attention</span> If your managed state needs custom serialization logic, please see +the [corresponding guide](custom_serialization.html) in order to ensure future compatibility. Flink's default serializers +don't need special treatment. + +## Using Managed Keyed State + +The managed keyed state interface provides access to different types of state that are all scoped to +the key of the current input element. This means that this type of state can only be used +on a `KeyedStream`, which can be created via `stream.keyBy(â¦)`. + +Now, we will first look at the different types of state available and then we will see +how they can be used in a program. The available state primitives are: + +* `ValueState<T>`: This keeps a value that can be updated and +retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value +for each key that the operation sees). The value can be set using `update(T)` and retrieved using +`T value()`. + +* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable` +over all currently stored elements. Elements are added using `add(T)`, the Iterable can +be retrieved using `Iterable<T> get()`. + +* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values +added to the state. The interface is the same as for `ListState` but elements added using +`add(T)` are reduced to an aggregate using a specified `ReduceFunction`. + +* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values +added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type +of elements that are added to the state. The interface is the same as for `ListState` but elements +added using `add(T)` are folded into an aggregate using a specified `FoldFunction`. + +* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and +retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or +`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable +views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively. + +All types of state also have a method `clear()` that clears the state for the currently +active key, i.e. the key of the input element. + +<span class="label label-danger">Attention</span> `FoldingState` will be deprecated in one of +the next versions of Flink and will be completely removed in the future. A more general +alternative will be provided. + +It is important to keep in mind that these state objects are only used for interfacing +with state. The state is not necessarily stored inside but might reside on disk or somewhere else. +The second thing to keep in mind is that the value you get from the state +depends on the key of the input element. So the value you get in one invocation of your +user function can differ from the value in another invocation if the keys involved are different. + +To get a state handle, you have to create a `StateDescriptor`. This holds the name of the state +(as we will see later, you can create several states, and they have to have unique names so +that you can reference them), the type of the values that the state holds, and possibly +a user-specified function, such as a `ReduceFunction`. Depending on what type of state you +want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`, +a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`. + +State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*. +Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for +information about that, but we will also see an example shortly. The `RuntimeContext` that +is available in a `RichFunction` has these methods for accessing state: + +* `ValueState<T> getState(ValueStateDescriptor<T>)` +* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)` +* `ListState<T> getListState(ListStateDescriptor<T>)` +* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)` +* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)` + +This is an example `FlatMapFunction` that shows how all of the parts fit together: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { + + /** + * The ValueState handle. The first field is the count, the second field a running sum. + */ + private transient ValueState<Tuple2<Long, Long>> sum; + + @Override + public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { + + // access the state value + Tuple2<Long, Long> currentSum = sum.value(); + + // update the count + currentSum.f0 += 1; + + // add the second field of the input value + currentSum.f1 += input.f1; + + // update the state + sum.update(currentSum); + + // if the count reaches 2, emit the average and clear the state + if (currentSum.f0 >= 2) { + out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); + sum.clear(); + } + } + + @Override + public void open(Configuration config) { + ValueStateDescriptor<Tuple2<Long, Long>> descriptor = + new ValueStateDescriptor<>( + "average", // the state name + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information + Tuple2.of(0L, 0L)); // default value of the state, if nothing was set + sum = getRuntimeContext().getState(descriptor); + } +} + +// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) +env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) + .keyBy(0) + .flatMap(new CountWindowAverage()) + .print(); + +// the printed output will be (1,4) and (1,5) +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] { + + private var sum: ValueState[(Long, Long)] = _ + + override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = { + + // access the state value + val tmpCurrentSum = sum.value + + // If it hasn't been used before, it will be null + val currentSum = if (tmpCurrentSum != null) { + tmpCurrentSum + } else { + (0L, 0L) + } + + // update the count + val newSum = (currentSum._1 + 1, currentSum._2 + input._2) + + // update the state + sum.update(newSum) + + // if the count reaches 2, emit the average and clear the state + if (newSum._1 >= 2) { + out.collect((input._1, newSum._2 / newSum._1)) + sum.clear() + } + } + + override def open(parameters: Configuration): Unit = { + sum = getRuntimeContext.getState( + new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]) + ) + } +} + + +object ExampleCountWindowAverage extends App { + val env = StreamExecutionEnvironment.getExecutionEnvironment + + env.fromCollection(List( + (1L, 3L), + (1L, 5L), + (1L, 7L), + (1L, 4L), + (1L, 2L) + )).keyBy(_._1) + .flatMap(new CountWindowAverage()) + .print() + // the printed output will be (1,4) and (1,5) + + env.execute("ExampleManagedState") +} +{% endhighlight %} +</div> +</div> + +This example implements a poor man's counting window. We key the tuples by the first field +(in the example all have the same key `1`). The function stores the count and a running sum in +a `ValueState`. Once the count reaches 2 it will emit the average and clear the state so that +we start over from `0`. Note that this would keep a different state value for each different input +key if we had tuples with different values in the first field. + +### State in the Scala DataStream API + +In addition to the interface described above, the Scala API has shortcuts for stateful +`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function +gets the current value of the `ValueState` in an `Option` and must return an updated value that +will be used to update the state. + +{% highlight scala %} +val stream: DataStream[(String, Int)] = ... + +val counts: DataStream[(String, Int)] = stream + .keyBy(_._1) + .mapWithState((in: (String, Int), count: Option[Int]) => + count match { + case Some(c) => ( (in._1, c), Some(c + in._2) ) + case None => ( (in._1, 0), Some(in._2) ) + }) +{% endhighlight %} + +## Using Managed Operator State + +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` +interface, or the `ListCheckpointed<T extends Serializable>` interface. + +#### CheckpointedFunction + +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: + +{% highlight java %} +void snapshotState(FunctionSnapshotContext context) throws Exception; + +void initializeState(FunctionInitializationContext context) throws Exception; +{% endhighlight %} + +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +only the place where different types of state are initialized, but also where state recovery logic is included. + +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of + all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. + Each operator gets a sublist, which can be empty, or contain one or more elements. + As an example, if with parallelism 1 the checkpointed state of an operator + contains elements `element1` and `element2`, when increasing the parallelism to 2, `element1` may end up in operator instance 0, + while `element2` will go to operator instance 1. + + - **Union redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of + all lists. On restore/redistribution, each operator gets the complete list of state elements. + +Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction` +to buffer elements before sending them to the outside world. It demonstrates +the basic even-split redistribution list state: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public class BufferingSink + implements SinkFunction<Tuple2<String, Integer>>, + CheckpointedFunction, + CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { + + private final int threshold; + + private transient ListState<Tuple2<String, Integer>> checkpointedState; + + private List<Tuple2<String, Integer>> bufferedElements; + + public BufferingSink(int threshold) { + this.threshold = threshold; + this.bufferedElements = new ArrayList<>(); + } + + @Override + public void invoke(Tuple2<String, Integer> value) throws Exception { + bufferedElements.add(value); + if (bufferedElements.size() == threshold) { + for (Tuple2<String, Integer> element: bufferedElements) { + // send it to the sink + } + bufferedElements.clear(); + } + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + checkpointedState.clear(); + for (Tuple2<String, Integer> element : bufferedElements) { + checkpointedState.add(element); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "buffered-elements", + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); + + checkpointedState = context.getOperatorStateStore().getListState(descriptor); + + if (context.isRestored()) { + for (Tuple2<String, Integer> element : checkpointedState.get()) { + bufferedElements.add(element); + } + } + } + + @Override + public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { + // this is from the CheckpointedRestoring interface. + this.bufferedElements.addAll(state); + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class BufferingSink(threshold: Int = 0) + extends SinkFunction[(String, Int)] + with CheckpointedFunction + with CheckpointedRestoring[List[(String, Int)]] { + + @transient + private var checkpointedState: ListState[(String, Int)] = null + + private val bufferedElements = ListBuffer[(String, Int)]() + + override def invoke(value: (String, Int)): Unit = { + bufferedElements += value + if (bufferedElements.size == threshold) { + for (element <- bufferedElements) { + // send it to the sink + } + bufferedElements.clear() + } + } + + override def snapshotState(context: FunctionSnapshotContext): Unit = { + checkpointedState.clear() + for (element <- bufferedElements) { + checkpointedState.add(element) + } + } + + override def initializeState(context: FunctionInitializationContext): Unit = { + val descriptor = new ListStateDescriptor[(String, Int)]( + "buffered-elements", + TypeInformation.of(new TypeHint[(String, Int)]() {}) + ) + + checkpointedState = context.getOperatorStateStore.getListState(descriptor) + + if(context.isRestored) { + for(element <- checkpointedState.get()) { + bufferedElements += element + } + } + } + + override def restoreState(state: List[(String, Int)]): Unit = { + bufferedElements ++= state + } +} +{% endhighlight %} +</div> +</div> + +The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize +the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects +are going to be stored upon checkpointing. + +Note how the state is initialized, similar to keyed state, +with a `StateDescriptor` that contains the state name and information +about the type of the value that the state holds: + + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +ListStateDescriptor<Tuple2<String, Integer>> descriptor = + new ListStateDescriptor<>( + "buffered-elements", + TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); + +checkpointedState = context.getOperatorStateStore().getListState(descriptor); +{% endhighlight %} + +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} + +val descriptor = new ListStateDescriptor[(String, Long)]( + "buffered-elements", + TypeInformation.of(new TypeHint[(String, Long)]() {}) +) + +checkpointedState = context.getOperatorStateStore.getListState(descriptor) + +{% endhighlight %} +</div> +</div> +The naming convention of the state access methods contain its redistribution +pattern followed by its state structure. For example, to use list state with the +union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`. +If the method name does not contain the redistribution pattern, *e.g.* `getListState(descriptor)`, +it simply implies that the basic even-split redistribution scheme will be used. + +After initializing the container, we use the `isRestored()` method of the context to check if we are +recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied. + +As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state +initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared +of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint. + +As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done +using the provided `FunctionInitializationContext`. + +#### ListCheckpointed + +The `ListCheckpointed` interface is a more limited variant of `CheckpointedFunction`, +which only supports list-style state with even-split redistribution scheme on restore. +It also requires the implementation of two methods: + +{% highlight java %} +List<T> snapshotState(long checkpointId, long timestamp) throws Exception; + +void restoreState(List<T> state) throws Exception; +{% endhighlight %} + +On `snapshotState()` the operator should return a list of objects to checkpoint and +`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always +return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. + +### Stateful Source Functions + +Stateful sources require a bit more care as opposed to other operators. +In order to make the updates to the state and output collection atomic (required for exactly-once semantics +on failure/recovery), the user is required to get a lock from the source's context. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +public static class CounterSource + extends RichParallelSourceFunction<Long> + implements ListCheckpointed<Long> { + + /** current offset for exactly once semantics */ + private Long offset; + + /** flag for job cancellation */ + private volatile boolean isRunning = true; + + @Override + public void run(SourceContext<Long> ctx) { + final Object lock = ctx.getCheckpointLock(); + + while (isRunning) { + // output and state update are atomic + synchronized (lock) { + ctx.collect(offset); + offset += 1; + } + } + } + + @Override + public void cancel() { + isRunning = false; + } + + @Override + public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) { + return Collections.singletonList(offset); + } + + @Override + public void restoreState(List<Long> state) { + for (Long s : state) + offset = s; + } +} +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +class CounterSource + extends RichParallelSourceFunction[Long] + with ListCheckpointed[Long] { + + @volatile + private var isRunning = true + + private var offset = 0L + + override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { + val lock = ctx.getCheckpointLock + + while (isRunning) { + // output and state update are atomic + lock.synchronized({ + ctx.collect(offset) + + offset += 1 + }) + } + } + + override def cancel(): Unit = isRunning = false + + override def restoreState(state: util.List[Long]): Unit = + for (s <- state) { + offset = s + } + + override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = + Collections.singletonList(offset) + +} +{% endhighlight %} +</div> +</div> + +Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/state_backends.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state/state_backends.md b/docs/dev/stream/state/state_backends.md new file mode 100644 index 0000000..1357f2e --- /dev/null +++ b/docs/dev/stream/state/state_backends.md @@ -0,0 +1,46 @@ +--- +title: "State Backends" +nav-parent_id: streaming_state +nav-pos: 5 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +Flink provides different state backends that specify how and where state is stored. + +State can be located on Javaâs heap or off-heap. Depending on your state backend, Flink can also manage the state for the application, meaning Flink deals with the memory management (possibly spilling to disk if necessary) to allow applications to hold very large state. By default, the configuration file *flink-conf.yaml* determines the state backend for all Flink jobs. + +However, the default state backend can be overridden on a per-job basis, as shown below. + +For more information about the available state backends, their advantages, limitations, and configuration parameters see the corresponding section in [Deployment & Operations]({{ site.baseurl }}/ops/state/state_backends.html). + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setStateBackend(...); +{% endhighlight %} +</div> +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +env.setStateBackend(...) +{% endhighlight %} +</div> +</div> http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/table/index.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/index.md b/docs/dev/table/index.md index df2ccba..5845c95 100644 --- a/docs/dev/table/index.md +++ b/docs/dev/table/index.md @@ -74,7 +74,7 @@ Where to go next? * [Concepts & Common API]({{ site.baseurl }}/dev/table/common.html): Shared concepts and APIs of the Table API and SQL. * [Streaming Table API & SQL]({{ site.baseurl }}/dev/table/streaming.html): Streaming-specific documentation for the Table API or SQL such as configuration of time attributes and handling of updating results. -* [Table API]({{ site.baseurl }}/dev/table/tableapi.html): Supported operations and API for the Table API. +* [Table API]({{ site.baseurl }}/dev/table/tableApi.html): Supported operations and API for the Table API. * [SQL]({{ site.baseurl }}/dev/table/sql.html): Supported operations and syntax for SQL * [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html): Reading tables from and emitting tables to external storage systems. * [User-Defined Functions]({{ site.baseurl }}/dev/table/udfs.html): Definition and usage of user-defined functions. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/internals/components.md ---------------------------------------------------------------------- diff --git a/docs/internals/components.md b/docs/internals/components.md index b750ffd..cf3b659 100644 --- a/docs/internals/components.md +++ b/docs/internals/components.md @@ -54,6 +54,6 @@ You can click on the components in the figure to learn more. <area id="dataset" title="DataSet API" href="{{ site.baseurl }}/dev/batch/index.html" shape="rect" coords="382,177,697,255" /> <area id="runtime" title="Runtime" href="{{ site.baseurl }}/concepts/runtime.html" shape="rect" coords="63,257,700,335" /> <area id="local" title="Local" href="{{ site.baseurl }}/quickstart/setup_quickstart.html" shape="rect" coords="62,337,275,414" /> -<area id="cluster" title="Cluster" href="{{ site.baseurl }}/setup/cluster_setup.html" shape="rect" coords="273,336,486,413" /> -<area id="cloud" title="Cloud" href="{{ site.baseurl }}/setup/gce_setup.html" shape="rect" coords="485,336,700,414" /> +<area id="cluster" title="Cluster" href="{{ site.baseurl }}/ops/deployment/cluster_setup.html" shape="rect" coords="273,336,486,413" /> +<area id="cloud" title="Cloud" href="{{ site.baseurl }}/ops/deployment/gce_setup.html" shape="rect" coords="485,336,700,414" /> </map> http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/internals/stream_checkpointing.md ---------------------------------------------------------------------- diff --git a/docs/internals/stream_checkpointing.md b/docs/internals/stream_checkpointing.md index d701c5e..330b0aa 100644 --- a/docs/internals/stream_checkpointing.md +++ b/docs/internals/stream_checkpointing.md @@ -45,7 +45,7 @@ The system then restarts the operators and resets them to the latest successful point of the state snapshot. Any records that are processed as part of the restarted parallel dataflow are guaranteed to not have been part of the previously checkpointed state. -*Note:* By default, checkpointing is disabled. See [Checkpointing]({{ site.baseurl }}/dev/stream/checkpointing.html) for details on how to enable and configure checkpointing. +*Note:* By default, checkpointing is disabled. See [Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) for details on how to enable and configure checkpointing. *Note:* For this mechanism to realize its full guarantees, the data stream source (such as message queue or broker) needs to be able to rewind the stream to a defined recent point. [Apache Kafka](http://kafka.apache.org) has this ability and Flink's connector to @@ -106,10 +106,10 @@ the barrier *n* from the other inputs as well. Otherwise, it would mix records t When operators contain any form of *state*, this state must be part of the snapshots as well. Operator state comes in different forms: - - *User-defined state*: This is state that is created and modified directly by the transformation functions (like `map()` or `filter()`). See [State in Streaming Applications]({{ site.baseurl }}/dev/stream/state.html) for details. + - *User-defined state*: This is state that is created and modified directly by the transformation functions (like `map()` or `filter()`). See [State in Streaming Applications]({{ site.baseurl }}/dev/stream/state/index.html) for details. - *System state*: This state refers to data buffers that are part of the operator's computation. A typical example for this state are the *window buffers*, inside which the system collects (and aggregates) records for windows until the window is evaluated and evicted. -Operators snapshot their state at the point in time when they have received all snapshot barriers from their input streams, and before emitting the barriers to their output streams. At that point, all updates to the state from records before the barriers will have been made, and no updates that depend on records from after the barriers have been applied. Because the state of a snapshot may be large, it is stored in a configurable *[state backend]({{ site.baseurl }}/ops/state_backends.html)*. By default, this is the JobManager's memory, but for production use a distributed reliable storage should be configured (such as HDFS). After the state has been stored, the operator acknowledges the checkpoint, emits the snapshot barrier into the output streams, and proceeds. +Operators snapshot their state at the point in time when they have received all snapshot barriers from their input streams, and before emitting the barriers to their output streams. At that point, all updates to the state from records before the barriers will have been made, and no updates that depend on records from after the barriers have been applied. Because the state of a snapshot may be large, it is stored in a configurable *[state backend]({{ site.baseurl }}/ops/state/state_backends.html)*. By default, this is the JobManager's memory, but for production use a distributed reliable storage should be configured (such as HDFS). After the state has been stored, the operator acknowledges the checkpoint, emits the snapshot barrier into the output streams, and proceeds. The resulting snapshot now contains: @@ -145,7 +145,7 @@ It is possible to let an operator continue processing while it stores its state After receiving the checkpoint barriers on its inputs, the operator starts the asynchronous snapshot copying of its state. It immediately emits the barrier to its outputs and continues with the regular stream processing. Once the background copy process has completed, it acknowledges the checkpoint to the checkpoint coordinator (the JobManager). The checkpoint is now only complete after all sinks have received the barriers and all stateful operators have acknowledged their completed backup (which may be after the barriers reach the sinks). -See [State Backends]({{ site.baseurl }}/ops/state_backends.html) for details on the state snapshots. +See [State Backends]({{ site.baseurl }}/ops/state/state_backends.html) for details on the state snapshots. ## Recovery http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/internals/task_lifecycle.md ---------------------------------------------------------------------- diff --git a/docs/internals/task_lifecycle.md b/docs/internals/task_lifecycle.md index cc557a1..fed2cb9 100644 --- a/docs/internals/task_lifecycle.md +++ b/docs/internals/task_lifecycle.md @@ -89,7 +89,7 @@ and skips any intermediate phases between the phase the operator was in when the **Checkpoints:** The `snapshotState()` method of the operator is called asynchronously to the rest of the methods described above whenever a checkpoint barrier is received. Checkpoints are performed during the processing phase, *i.e.* after the operator is opened and before it is closed. The responsibility of this method is to store the current state of the operator -to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) from where it will be retrieved when +to the specified [state backend]({{ site.baseurl }}/ops/state/state_backends.html) from where it will be retrieved when the job resumes execution after a failure. Below we include a brief description of Flink's checkpointing mechanism, and for a more detailed discussion on the principles around checkpointing in Flink please read the corresponding documentation: [Data Streaming Fault Tolerance]({{ site.baseurl }}/internals/stream_checkpointing.html). @@ -125,7 +125,7 @@ first step for the task is to retrieve its initial, task-wide state. This is don particularly important in two cases: 1. when the task is recovering from a failure and restarts from the last successful checkpoint -2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html). +2. when resuming from a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html). If it is the first time the task is executed, the initial task state is empty. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/monitoring/debugging_classloading.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/debugging_classloading.md b/docs/monitoring/debugging_classloading.md index 8c91e0f..d69870a 100644 --- a/docs/monitoring/debugging_classloading.md +++ b/docs/monitoring/debugging_classloading.md @@ -57,7 +57,7 @@ YARN classloading differs between single job deploymens and sessions: **Mesos** -Mesos setups following [this documentation](../setup/mesos.html) currently behave very much like the a +Mesos setups following [this documentation](../ops/deployment/mesos.html) currently behave very much like the a YARN session: The TaskManager and JobManager processes are started with the Flink framework classes in classpath, job classes are loaded dynamically when the jobs are submitted. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/monitoring/historyserver.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md index 773f3be..e109512 100644 --- a/docs/monitoring/historyserver.md +++ b/docs/monitoring/historyserver.md @@ -71,7 +71,7 @@ historyserver.archive.fs.refresh-interval: 10000 The contained archives are downloaded and cached in the local filesystem. The local directory for this is configured via `historyserver.web.tmpdir`. -Check out the configuration page for a [complete list of configuration options]({{ site.baseurl }}/setup/config.html#history-server). +Check out the configuration page for a [complete list of configuration options]({{ site.baseurl }}/ops/config.html#history-server). ## Available Requests