[FLINK-7301] [docs] Rework state documentation This closes #4441.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47070674 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47070674 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47070674 Branch: refs/heads/master Commit: 470706740d5d89de69844a5662166ce94d71d00d Parents: 2ed74ca Author: twalthr <twal...@apache.org> Authored: Mon Jul 31 20:14:31 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Aug 8 14:05:51 2017 +0200 ---------------------------------------------------------------------- docs/check_links.sh | 5 +- docs/concepts/programming-model.md | 4 +- docs/concepts/runtime.md | 10 +- docs/dev/api_concepts.md | 4 +- docs/dev/batch/connectors.md | 2 +- docs/dev/batch/python.md | 2 +- docs/dev/cluster_execution.md | 2 +- docs/dev/connectors/index.md | 2 +- docs/dev/connectors/kafka.md | 2 +- docs/dev/datastream_api.md | 4 +- docs/dev/execution_configuration.md | 2 +- docs/dev/libs/storm_compatibility.md | 2 +- docs/dev/linking_with_flink.md | 2 +- docs/dev/local_execution.md | 2 +- docs/dev/migration.md | 8 +- docs/dev/packaging.md | 2 +- docs/dev/parallel.md | 4 +- docs/dev/stream/checkpointing.md | 174 ----- docs/dev/stream/process_function.md | 2 +- docs/dev/stream/queryable_state.md | 291 -------- docs/dev/stream/state.md | 768 --------------------- docs/dev/stream/state/checkpointing.md | 174 +++++ docs/dev/stream/state/custom_serialization.md | 188 +++++ docs/dev/stream/state/index.md | 56 ++ docs/dev/stream/state/queryable_state.md | 292 ++++++++ docs/dev/stream/state/state.md | 596 ++++++++++++++++ docs/dev/stream/state/state_backends.md | 46 ++ docs/dev/table/index.md | 2 +- docs/internals/components.md | 4 +- docs/internals/stream_checkpointing.md | 8 +- docs/internals/task_lifecycle.md | 4 +- docs/monitoring/debugging_classloading.md | 2 +- docs/monitoring/historyserver.md | 2 +- docs/monitoring/large_state_tuning.md | 237 ------- docs/ops/README.md | 21 - docs/ops/cli.md | 355 ++++++++++ docs/ops/config.md | 713 +++++++++++++++++++ docs/ops/deployment/aws.md | 374 ++++++++++ docs/ops/deployment/cluster_setup.md | 151 ++++ docs/ops/deployment/docker.md | 102 +++ docs/ops/deployment/gce_setup.md | 93 +++ docs/ops/deployment/index.md | 24 + docs/ops/deployment/kubernetes.md | 157 +++++ docs/ops/deployment/mapr_setup.md | 132 ++++ docs/ops/deployment/mesos.md | 269 ++++++++ docs/ops/deployment/yarn_setup.md | 338 +++++++++ docs/ops/index.md | 25 + docs/ops/jobmanager_high_availability.md | 239 +++++++ docs/ops/production_ready.md | 6 +- docs/ops/security-kerberos.md | 8 +- docs/ops/security-ssl.md | 144 ++++ docs/ops/state/checkpoints.md | 101 +++ docs/ops/state/index.md | 24 + docs/ops/state/large_state_tuning.md | 237 +++++++ docs/ops/state/savepoints.md | 198 ++++++ docs/ops/state/state_backends.md | 169 +++++ docs/ops/state_backends.md | 169 ----- docs/ops/upgrading.md | 14 +- docs/quickstart/setup_quickstart.md | 2 +- docs/redirects/cli.md | 2 +- docs/redirects/fault_tolerance.md | 2 +- docs/redirects/savepoints.md | 2 +- docs/redirects/state.md | 2 +- docs/redirects/state_backends.md | 2 +- docs/setup/aws.md | 374 ---------- docs/setup/building.md | 149 ---- docs/setup/checkpoints.md | 101 --- docs/setup/cli.md | 355 ---------- docs/setup/cluster_setup.md | 151 ---- docs/setup/config.md | 713 ------------------- docs/setup/deployment.md | 24 - docs/setup/docker.md | 102 --- docs/setup/flink_on_windows.md | 86 --- docs/setup/gce_setup.md | 93 --- docs/setup/index.md | 25 - docs/setup/jobmanager_high_availability.md | 239 ------- docs/setup/kubernetes.md | 157 ----- docs/setup/mapr_setup.md | 132 ---- docs/setup/mesos.md | 269 -------- docs/setup/savepoints.md | 198 ------ docs/setup/security-ssl.md | 144 ---- docs/setup/yarn_setup.md | 338 --------- docs/start/building.md | 149 ++++ docs/start/flink_on_windows.md | 86 +++ 84 files changed, 5495 insertions(+), 5370 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/check_links.sh ---------------------------------------------------------------------- diff --git a/docs/check_links.sh b/docs/check_links.sh index c4307a5..5d9f762 100755 --- a/docs/check_links.sh +++ b/docs/check_links.sh @@ -31,6 +31,9 @@ fi # Fail the build if any broken links are found broken_links_str=$(grep -e 'Found [[:digit:]]\+ broken links' spider.log) if [ -n "$broken_links_str" ]; then - echo -e "\e[1;31m$broken_links_str\e[0m" + grep -B 1 "Remote file does not exist -- broken link!!!" spider.log + echo "---------------------------------------------------------------------------" + echo -e "$broken_links_str" + echo "Search for page containing broken link using 'grep -R BROKEN_PATH DOCS_DIR'" exit 1 fi http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/concepts/programming-model.md ---------------------------------------------------------------------- diff --git a/docs/concepts/programming-model.md b/docs/concepts/programming-model.md index 7b0cfb5..fd5ebee 100644 --- a/docs/concepts/programming-model.md +++ b/docs/concepts/programming-model.md @@ -171,7 +171,7 @@ This alignment also allows Flink to redistribute the state and adjust the stream <img src="../fig/state_partitioning.svg" alt="State and Partitioning" class="offset" width="50%" /> -For more information, see the documentation on [working with state](../dev/stream/state.html). +For more information, see the documentation on [state](../dev/stream/state/index.html). {% top %} @@ -188,7 +188,7 @@ of events that need to be replayed). The description of the [fault tolerance internals]({{ site.baseurl }}/internals/stream_checkpointing.html) provides more information about how Flink manages checkpoints and related topics. -Details about enabling and configuring checkpointing are in the [checkpointing API docs](../dev/stream/checkpointing.html). +Details about enabling and configuring checkpointing are in the [checkpointing API docs](../dev/stream/state/checkpointing.html). {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/concepts/runtime.md ---------------------------------------------------------------------- diff --git a/docs/concepts/runtime.md b/docs/concepts/runtime.md index c598b12..cb6d58f 100644 --- a/docs/concepts/runtime.md +++ b/docs/concepts/runtime.md @@ -54,8 +54,8 @@ The Flink runtime consists of two types of processes: There must always be at least one TaskManager. -The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../setup/cluster_setup.html), in -containers, or managed by resource frameworks like [YARN](../setup/yarn_setup.html) or [Mesos](../setup/mesos.html). +The JobManagers and TaskManagers can be started in various ways: directly on the machines as a [standalone cluster](../ops/deployment/cluster_setup.html), in +containers, or managed by resource frameworks like [YARN](../ops/deployment/yarn_setup.html) or [Mesos](../ops/deployment/mesos.html). TaskManagers connect to JobManagers, announcing themselves as available, and are assigned work. The **client** is not part of the runtime and program execution, but is used to prepare and send a dataflow to the JobManager. @@ -107,7 +107,7 @@ With hyper-threading, each slot then takes 2 or more hardware thread contexts. ## State Backends -The exact data structures in which the key/values indexes are stored depends on the chosen [state backend](../ops/state_backends.html). One state backend +The exact data structures in which the key/values indexes are stored depends on the chosen [state backend](../ops/state/state_backends.html). One state backend stores data in an in-memory hash map, another state backend uses [RocksDB](http://rocksdb.org) as the key/value store. In addition to defining the data structure that holds the state, the state backends also implement the logic to take a point-in-time snapshot of the key/value state and store that snapshot as part of a checkpoint. @@ -120,8 +120,8 @@ take a point-in-time snapshot of the key/value state and store that snapshot as Programs written in the Data Stream API can resume execution from a **savepoint**. Savepoints allow both updating your programs and your Flink cluster without losing any state. -[Savepoints](..//setup/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed. +[Savepoints](../ops/state/savepoints.html) are **manually triggered checkpoints**, which take a snapshot of the program and write it out to a state backend. They rely on the regular checkpointing mechanism for this. During execution programs are periodically snapshotted on the worker nodes and produce checkpoints. For recovery only the last completed checkpoint is needed and older checkpoints can be safely discarded as soon as a new one is completed. -Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed. Savepoints can be created from the [command line](../setup/cli.html#savepoints) or when cancelling a job via the [REST API](../monitoring/rest_api.html#cancel-job-with-savepoint). +Savepoints are similar to these periodic checkpoints except that they are **triggered by the user** and **don't automatically expire** when newer checkpoints are completed. Savepoints can be created from the [command line](../ops/cli.html#savepoints) or when cancelling a job via the [REST API](../monitoring/rest_api.html#cancel-job-with-savepoint). {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/api_concepts.md ---------------------------------------------------------------------- diff --git a/docs/dev/api_concepts.md b/docs/dev/api_concepts.md index b3618aa..c447f27 100644 --- a/docs/dev/api_concepts.md +++ b/docs/dev/api_concepts.md @@ -100,7 +100,7 @@ will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the -[command line]({{ site.baseurl }}/setup/cli.html), the Flink cluster manager +[command line]({{ site.baseurl }}/ops/cli.html), the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return an execution environment for executing your program on a cluster. @@ -169,7 +169,7 @@ will do the right thing depending on the context: if you are executing your program inside an IDE or as a regular Java program it will create a local environment that will execute your program on your local machine. If you created a JAR file from your program, and invoke it through the -[command line]({{ site.baseurl }}/apis/cli.html), the Flink cluster manager +[command line]({{ site.baseurl }}/ops/cli.html), the Flink cluster manager will execute your main method and `getExecutionEnvironment()` will return an execution environment for executing your program on a cluster. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/batch/connectors.md ---------------------------------------------------------------------- diff --git a/docs/dev/batch/connectors.md b/docs/dev/batch/connectors.md index b7a0718..93bbf72 100644 --- a/docs/dev/batch/connectors.md +++ b/docs/dev/batch/connectors.md @@ -58,7 +58,7 @@ In order to use a Hadoop file system with Flink, make sure that #### Amazon S3 -See [Deployment & Operations - Deployment - AWS - S3: Simple Storage Service]({{ site.baseurl }}/setup/aws.html) for available S3 file system implementations, their configuration and required libraries. +See [Deployment & Operations - Deployment - AWS - S3: Simple Storage Service]({{ site.baseurl }}/ops/deployment/aws.html) for available S3 file system implementations, their configuration and required libraries. #### Alluxio http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/batch/python.md ---------------------------------------------------------------------- diff --git a/docs/dev/batch/python.md b/docs/dev/batch/python.md index c4c2671..0383f54 100644 --- a/docs/dev/batch/python.md +++ b/docs/dev/batch/python.md @@ -615,7 +615,7 @@ env.execute() A system-wide default parallelism for all execution environments can be defined by setting the `parallelism.default` property in `./conf/flink-conf.yaml`. See the -[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details. +[Configuration]({{ site.baseurl }}/ops/config.html) documentation for details. {% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/cluster_execution.md ---------------------------------------------------------------------- diff --git a/docs/dev/cluster_execution.md b/docs/dev/cluster_execution.md index d614846..03af637 100644 --- a/docs/dev/cluster_execution.md +++ b/docs/dev/cluster_execution.md @@ -33,7 +33,7 @@ are two ways to send a program to a cluster for execution: The command line interface lets you submit packaged programs (JARs) to a cluster (or single machine setup). -Please refer to the [Command Line Interface]({{ site.baseurl }}/setup/cli.html) documentation for +Please refer to the [Command Line Interface]({{ site.baseurl }}/ops/cli.html) documentation for details. ## Remote Environment http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/connectors/index.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md index 177e601..00c0853 100644 --- a/docs/dev/connectors/index.md +++ b/docs/dev/connectors/index.md @@ -80,5 +80,5 @@ When a Flink application pushes a lot of data to an external data store, this can become an I/O bottleneck. If the data involved has many fewer reads than writes, a better approach can be for an external application to pull from Flink the data it needs. -The [Queryable State]({{ site.baseurl }}/dev/stream/queryable_state.html) interface +The [Queryable State]({{ site.baseurl }}/dev/stream/state/queryable_state.html) interface enables this by allowing the state being managed by Flink to be queried on demand. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/connectors/kafka.md ---------------------------------------------------------------------- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index d4e8978..042ad11 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -573,5 +573,5 @@ Once Kerberos-based Flink security is enabled, you can authenticate to Kafka wit When using standalone Flink deployment, you can also use `SASL_SSL`; please see how to configure the Kafka client for SSL [here](https://kafka.apache.org/documentation/#security_configclients). - Set `sasl.kerberos.service.name` to `kafka` (default `kafka`): The value for this should match the `sasl.kerberos.service.name` used for Kafka broker configurations. A mismatch in service name between client and server configuration will cause the authentication to fail. -For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/setup/config.html). +For more information on Flink configuration for Kerberos security, please see [here]({{ site.baseurl}}/ops/config.html). You can also find [here]({{ site.baseurl}}/ops/security-kerberos.html) further details on how Flink internally setups Kerberos-based security. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/datastream_api.md ---------------------------------------------------------------------- diff --git a/docs/dev/datastream_api.md b/docs/dev/datastream_api.md index 7191d82..8b3899b 100644 --- a/docs/dev/datastream_api.md +++ b/docs/dev/datastream_api.md @@ -1158,7 +1158,7 @@ previous transformation. For example, you can use `someStream.map(...).startNewC you cannot use `someStream.startNewChain()`. A resource group is a slot in Flink, see -[slots]({{site.baseurl}}/setup/config.html#configuring-taskmanager-processing-slots). You can +[slots]({{site.baseurl}}/ops/config.html#configuring-taskmanager-processing-slots). You can manually isolate operators in separate slots if desired. <div class="codetabs" markdown="1"> @@ -1604,7 +1604,7 @@ for an explanation of most parameters. These parameters pertain specifically to ### Fault Tolerance -[State & Checkpointing]({{ site.baseurl }}/dev/stream/checkpointing.html) describes how to enable and configure Flink's checkpointing mechanism. +[State & Checkpointing]({{ site.baseurl }}/dev/stream/state/checkpointing.html) describes how to enable and configure Flink's checkpointing mechanism. ### Controlling Latency http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/execution_configuration.md ---------------------------------------------------------------------- diff --git a/docs/dev/execution_configuration.md b/docs/dev/execution_configuration.md index 94e788c..2316450 100644 --- a/docs/dev/execution_configuration.md +++ b/docs/dev/execution_configuration.md @@ -23,7 +23,7 @@ under the License. --> The `StreamExecutionEnvironment` contains the `ExecutionConfig` which allows to set job specific configuration values for the runtime. -To change the defaults that affect all jobs, see [Configuration]({{ site.baseurl }}/setup/config.html). +To change the defaults that affect all jobs, see [Configuration]({{ site.baseurl }}/ops/config.html). <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/libs/storm_compatibility.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/storm_compatibility.md b/docs/dev/libs/storm_compatibility.md index 89d7706..6b24dc0 100644 --- a/docs/dev/libs/storm_compatibility.md +++ b/docs/dev/libs/storm_compatibility.md @@ -134,7 +134,7 @@ DataStream<String> rawInput = env.addSource( If a Spout emits a finite number of tuples, `SpoutWrapper` can be configures to terminate automatically by setting `numberOfInvocations` parameter in its constructor. This allows the Flink program to shut down automatically after all data is processed. -Per default the program will run until it is [canceled]({{site.baseurl}}/setup/cli.html) manually. +Per default the program will run until it is [canceled]({{site.baseurl}}/ops/cli.html) manually. ## Embed Bolts http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/linking_with_flink.md ---------------------------------------------------------------------- diff --git a/docs/dev/linking_with_flink.md b/docs/dev/linking_with_flink.md index 73ca677..3f55b9e 100644 --- a/docs/dev/linking_with_flink.md +++ b/docs/dev/linking_with_flink.md @@ -126,7 +126,7 @@ to run your program on Flink with Scala 2.11, you need to add a `_2.11` suffix t values of the Flink modules in your dependencies section. If you are looking for building Flink with Scala 2.11, please check -[build guide]({{ site.baseurl }}/setup/building.html#scala-versions). +[build guide]({{ site.baseurl }}/start/building.html#scala-versions). #### Hadoop Dependency Versions http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/local_execution.md ---------------------------------------------------------------------- diff --git a/docs/dev/local_execution.md b/docs/dev/local_execution.md index 45a39e3..cf89956 100644 --- a/docs/dev/local_execution.md +++ b/docs/dev/local_execution.md @@ -57,7 +57,7 @@ The `LocalEnvironment` is a handle to local execution for Flink programs. Use it The local environment is instantiated via the method `ExecutionEnvironment.createLocalEnvironment()`. By default, it will use as many local threads for execution as your machine has CPU cores (hardware contexts). You can alternatively specify the desired parallelism. The local environment can be configured to log to the console using `enableLogging()`/`disableLogging()`. -In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the even better way to go. That method returns a `LocalEnvironment` when the program is started locally (outside the command line interface), and it returns a pre-configured environment for cluster execution, when the program is invoked by the [command line interface]({{ site.baseurl }}/setup/cli.html). +In most cases, calling `ExecutionEnvironment.getExecutionEnvironment()` is the even better way to go. That method returns a `LocalEnvironment` when the program is started locally (outside the command line interface), and it returns a pre-configured environment for cluster execution, when the program is invoked by the [command line interface]({{ site.baseurl }}/ops/cli.html). ~~~java public static void main(String[] args) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/migration.md ---------------------------------------------------------------------- diff --git a/docs/dev/migration.md b/docs/dev/migration.md index cf8c8f8..3369a2c 100644 --- a/docs/dev/migration.md +++ b/docs/dev/migration.md @@ -37,7 +37,7 @@ This would be relevant mostly for users implementing custom `TypeSerializer`s fo Since Flink 1.3, two additional methods have been added that are related to serializer compatibility across savepoint restores. Please see -[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state.html#handling-serializer-upgrades-and-compatibility) +[Handling serializer upgrades and compatibility]({{ site.baseurl }}/dev/stream/state/custom_serialization.html#handling-serializer-upgrades-and-compatibility) for further details on how to implement these methods. ### `ProcessFunction` is always a `RichFunction` @@ -75,7 +75,7 @@ For other custom projects, make sure to add logger dependencies. For example, in ## Migrating from Flink 1.1 to Flink 1.2 -As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state.html), Flink has two types of state: +As mentioned in the [State documentation]({{ site.baseurl }}/dev/stream/state/state.html), Flink has two types of state: **keyed** and **non-keyed** state (also called **operator** state). Both types are available to both operators and user-defined functions. This document will guide you through the process of migrating your Flink 1.1 function code to Flink 1.2 and will present some important internal changes introduced in Flink 1.2 that concern the @@ -89,7 +89,7 @@ The migration process will serve two goals: Flink 1.1 predecessor. After following the steps in this guide, you will be able to migrate your running job from Flink 1.1 to Flink 1.2 -simply by taking a [savepoint]({{ site.baseurl }}/setup/savepoints.html) with your Flink 1.1 job and giving it to +simply by taking a [savepoint]({{ site.baseurl }}/ops/state/savepoints.html) with your Flink 1.1 job and giving it to your Flink 1.2 job as a starting point. This will allow the Flink 1.2 job to resume execution from where its Flink 1.1 predecessor left off. @@ -203,7 +203,7 @@ contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism while `(test2, 2)` will go to task 1. More details on the principles behind rescaling of both keyed state and non-keyed state can be found in -the [State documentation]({{ site.baseurl }}/dev/stream/state.html). +the [State documentation]({{ site.baseurl }}/dev/stream/state/index.html). ##### ListCheckpointed http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/packaging.md ---------------------------------------------------------------------- diff --git a/docs/dev/packaging.md b/docs/dev/packaging.md index ee351ae..e83d9ac 100644 --- a/docs/dev/packaging.md +++ b/docs/dev/packaging.md @@ -27,7 +27,7 @@ under the License. As described earlier, Flink programs can be executed on clusters by using a `remote environment`. Alternatively, programs can be packaged into JAR Files (Java Archives) for execution. Packaging the program is a prerequisite to executing them through the -[command line interface]({{ site.baseurl }}/setup/cli.html). +[command line interface]({{ site.baseurl }}/ops/cli.html). ### Packaging Programs http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/parallel.md ---------------------------------------------------------------------- diff --git a/docs/dev/parallel.md b/docs/dev/parallel.md index 549481f..ae6f863 100644 --- a/docs/dev/parallel.md +++ b/docs/dev/parallel.md @@ -27,7 +27,7 @@ program consists of multiple tasks (transformations/operators, data sources, and several parallel instances for execution and each parallel instance processes a subset of the task's input data. The number of parallel instances of a task is called its *parallelism*. -If you want to use [savepoints]({{ site.baseurl }}/setup/savepoints.html) you should also consider +If you want to use [savepoints]({{ site.baseurl }}/ops/state/savepoints.html) you should also consider setting a maximum parallelism (or `max parallelism`). When restoring from a savepoint you can change the parallelism of specific operators or the whole program and this setting specifies an upper bound on the parallelism. This is required because Flink internally partitions state @@ -181,7 +181,7 @@ try { A system-wide default parallelism for all execution environments can be defined by setting the `parallelism.default` property in `./conf/flink-conf.yaml`. See the -[Configuration]({{ site.baseurl }}/setup/config.html) documentation for details. +[Configuration]({{ site.baseurl }}/ops/config.html) documentation for details. ## Setting the Maximum Parallelism http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/checkpointing.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/checkpointing.md b/docs/dev/stream/checkpointing.md deleted file mode 100644 index 4fe06c1..0000000 --- a/docs/dev/stream/checkpointing.md +++ /dev/null @@ -1,174 +0,0 @@ ---- -title: "Checkpointing" -nav-parent_id: streaming -nav-pos: 50 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -* ToC -{:toc} - -Every function and operator in Flink can be **stateful** (see [working with state](state.html) for details). -Stateful functions store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. - -In order to make state fault tolerant, Flink needs to **checkpoint** the state. Checkpoints allow Flink to recover state and positions -in the streams to give the application the same semantics as a failure-free execution. - -The [documentation on streaming fault tolerance](../../internals/stream_checkpointing.html) describes in detail the technique behind Flink's streaming fault tolerance mechanism. - - -## Prerequisites - -Flink's checkpointing mechanism interacts with durable storage for streams and state. In general, it requires: - - - A *persistent* (or *durable*) data source that can replay records for a certain amount of time. Examples for such sources are persistent messages queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...). - - A persistent storage for state, typically a distributed filesystem (e.g., HDFS, S3, GFS, NFS, Ceph, ...) - - -## Enabling and Configuring Checkpointing - -By default, checkpointing is disabled. To enable checkpointing, call `enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the checkpoint interval in milliseconds. - -Other parameters for checkpointing include: - - - *exactly-once vs. at-least-once*: You can optionally pass a mode to the `enableCheckpointing(n)` method to choose between the two guarantee levels. - Exactly-once is preferrable for most applications. At-least-once may be relevant for certain super-low-latency (consistently few milliseconds) applications. - - - *checkpoint timeout*: The time after which a checkpoint-in-progress is aborted, if it did not complete by then. - - - *minimum time between checkpoints*: To make sure that the streaming application makes a certain amount of progress between checkpoints, - one can define how much time needs to pass between checkpoints. If this value is set for example to *5000*, the next checkpoint will be - started no sooner than 5 seconds after the previous checkpoint completed, regardless of the checkpoint duration and the checkpoint interval. - Note that this implies that the checkpoint interval will never be smaller than this parameter. - - It is often easier to configure applications by defining the "time between checkpoints" then the checkpoint interval, because the "time between checkpoints" - is not susceptible to the fact that checkpoints may sometimes take longer than on average (for example if the target storage system is temporarily slow). - - Note that this value also implies that the number of concurrent checkpoints is *one*. - - - *number of concurrent checkpoints*: By default, the system will not trigger another checkpoint while one is still in progress. - This ensures that the topology does not spend too much time on checkpoints and not make progress with processing the streams. - It is possible to allow for multiple overlapping checkpoints, which is interesting for pipelines that have a certain processing delay - (for example because the functions call external services that need some time to respond) but that still want to do very frequent checkpoints - (100s of milliseconds) to re-process very little upon failures. - - This option cannot be used when a minimum time between checkpoints is defined. - - - *externalized checkpoints*: You can configure periodic checkpoints to be persisted externally. Externalized checkpoints write their meta data out to persistent storage and are *not* automatically cleaned up when the job fails. This way, you will have a checkpoint around to resume from if your job fails. There are more details in the [deployment notes on externalized checkpoints](../../setup/checkpoints.html#externalized-checkpoints). - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - -// start a checkpoint every 1000 ms -env.enableCheckpointing(1000); - -// advanced options: - -// set mode to exactly-once (this is the default) -env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); - -// make sure 500 ms of progress happen between checkpoints -env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); - -// checkpoints have to complete within one minute, or are discarded -env.getCheckpointConfig().setCheckpointTimeout(60000); - -// allow only one checkpoint to be in progress at the same time -env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - -// enable externalized checkpoints which are retained after job cancellation -env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); -{% endhighlight %} -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} -val env = StreamExecutionEnvironment.getExecutionEnvironment() - -// start a checkpoint every 1000 ms -env.enableCheckpointing(1000) - -// advanced options: - -// set mode to exactly-once (this is the default) -env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) - -// make sure 500 ms of progress happen between checkpoints -env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) - -// checkpoints have to complete within one minute, or are discarded -env.getCheckpointConfig.setCheckpointTimeout(60000) - -// allow only one checkpoint to be in progress at the same time -env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) -{% endhighlight %} -</div> -</div> - -### Related Config Options - -Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` (see [configuration](config.html) for a full guide): - -- `state.backend`: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends: - - `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging. - - `filesystem`: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, ... - -- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use `file://` only for local setups. - -- `state.backend.rocksdb.checkpointdir`: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example â:â (colon) on Linux/Unix). (DEFAULT value is `taskmanager.tmp.dirs`) - -- `state.checkpoints.dir`: The target directory for meta data of [externalized checkpoints](../../setup/checkpoints.html#externalized-checkpoints). - -- `state.checkpoints.num-retained`: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1) - -{% top %} - - -## Selecting a State Backend - -Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html) stores consistent snapshots -of all the state in timers and stateful operators, including connectors, windows, and any [user-defined state](state.html). -Where the checkpoints are stored (e.g., JobManager memory, file system, database) depends on the configured -**State Backend**. - -By default, state is kept in memory in the TaskManagers and checkpoints are stored in memory in the JobManager. For proper persistence of large state, -Flink supports various approaches for storing and checkpointing state in other state backends. The choice of state backend can be configured via `StreamExecutionEnvironment.setStateBackend(â¦)`. - -See [state backends](../../ops/state_backends.html) for more details on the available state backends and options for job-wide and cluster-wide configuration. - - -## State Checkpoints in Iterative Jobs - -Flink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`. - -Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure. - -{% top %} - - -## Restart Strategies - -Flink supports different restart strategies which control how the jobs are restarted in case of a failure. For more -information, see [Restart Strategies]({{ site.baseurl }}/dev/restart_strategies.html). - -{% top %} - http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/process_function.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md index fb5f39d..696a8b8 100644 --- a/docs/dev/stream/process_function.md +++ b/docs/dev/stream/process_function.md @@ -38,7 +38,7 @@ all (acyclic) streaming applications: The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events by being invoked for each event received in the input stream(s). -For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the +For fault-tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state/state.html), accessible via the `RuntimeContext`, similar to the way other stateful functions can access keyed state. The timers allow applications to react to changes in processing time and in [event time](../event_time.html). http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/queryable_state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/queryable_state.md b/docs/dev/stream/queryable_state.md deleted file mode 100644 index 234be51..0000000 --- a/docs/dev/stream/queryable_state.md +++ /dev/null @@ -1,291 +0,0 @@ ---- -title: "Queryable State" -nav-parent_id: streaming -nav-pos: 61 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -* ToC -{:toc} - -<div class="alert alert-warning"> - <strong>Note:</strong> The client APIs for queryable state are currently in an evolving state and - there are <strong>no guarantees</strong> made about stability of the provided interfaces. It is - likely that there will be breaking API changes on the client side in the upcoming Flink versions. -</div> - -In a nutshell, this feature allows users to query Flink's managed partitioned state -(see [Working with State]({{ site.baseurl }}/dev/stream/state.html)) from outside of -Flink. For some scenarios, queryable state thus eliminates the need for distributed -operations/transactions with external systems such as key-value stores which are often the -bottleneck in practice. - -<div class="alert alert-warning"> - <strong>Attention:</strong> Queryable state accesses keyed state from a concurrent thread rather - than synchronizing with the operator and potentially blocking its operation. Since any state - backend using Java heap space, e.g. MemoryStateBackend or - FsStateBackend, does not work with copies when retrieving values but instead directly - references the stored values, read-modify-write patterns are unsafe and may cause the - queryable state server to fail due to concurrent modifications. - The RocksDBStateBackend is safe from these issues. -</div> - -## Making State Queryable - -In order to make state queryable, the queryable state server first needs to be enabled globally -by setting the `query.server.enable` configuration parameter to `true` (current default). -Then appropriate state needs to be made queryable by using either - -* a `QueryableStateStream`, a convenience object which behaves like a sink and offers incoming values as -queryable state, or -* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the keyed state of an -operator queryable. - -The following sections explain the use of these two approaches. - -### Queryable State Stream - -A `KeyedStream` may offer its values as queryable state by using the following methods: - -{% highlight java %} -// ValueState -QueryableStateStream asQueryableState( - String queryableStateName, - ValueStateDescriptor stateDescriptor) - -// Shortcut for explicit ValueStateDescriptor variant -QueryableStateStream asQueryableState(String queryableStateName) - -// FoldingState -QueryableStateStream asQueryableState( - String queryableStateName, - FoldingStateDescriptor stateDescriptor) - -// ReducingState -QueryableStateStream asQueryableState( - String queryableStateName, - ReducingStateDescriptor stateDescriptor) -{% endhighlight %} - - -<div class="alert alert-info"> - <strong>Note:</strong> There is no queryable <code>ListState</code> sink as it would result in an ever-growing - list which may not be cleaned up and thus will eventually consume too much memory. -</div> - -A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and -currently only holds the name as well as the value and key serializer for the queryable state -stream. It is comparable to a sink, and cannot be followed by further transformations. - -Internally a `QueryableStateStream` gets translated to an operator which uses all incoming -records to update the queryable state instance. -In a program like the following, all records of the keyed stream will be used to update the state -instance, either via `ValueState#update(value)` or `AppendingState#add(value)`, depending on -the chosen state variant: -{% highlight java %} -stream.keyBy(0).asQueryableState("query-name") -{% endhighlight %} -This acts like the Scala API's `flatMapWithState`. - -### Managed Keyed State - -Managed keyed state of an operator -(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state.html#using-managed-keyed-state)) -can be made queryable by making the appropriate state descriptor queryable via -`StateDescriptor#setQueryable(String queryableStateName)`, as in the example below: -{% highlight java %} -ValueStateDescriptor<Tuple2<Long, Long>> descriptor = - new ValueStateDescriptor<>( - "average", // the state name - TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information - Tuple2.of(0L, 0L)); // default value of the state, if nothing was set -descriptor.setQueryable("query-name"); // queryable state name -{% endhighlight %} -<div class="alert alert-info"> - <strong>Note:</strong> The `queryableStateName` parameter may be chosen arbitrarily and is only - used for queries. It does not have to be identical to the state's own name. -</div> - - -## Querying State - -The `QueryableStateClient` helper class may be used for queries against the `KvState` instances that -serve the state internally. It needs to be set up with a valid `JobManager` address and port and is -created as follows: - -{% highlight java %} -final Configuration config = new Configuration(); -config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress); -config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort); - -final HighAvailabilityServices highAvailabilityServices = - HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.newSingleThreadScheduledExecutor(), - HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); - -QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); -{% endhighlight %} - -The query method is this: - -{% highlight java %} -Future<byte[]> getKvState( - JobID jobID, - String queryableStateName, - int keyHashCode, - byte[] serializedKeyAndNamespace) -{% endhighlight %} - -A call to this method returns a `Future` eventually holding the serialized state value for the -queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The -`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` and the -`serializedKeyAndNamespace` is the serialized key and namespace. -<div class="alert alert-info"> - <strong>Note:</strong> The client is asynchronous and can be shared by multiple threads. It needs - to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused in order to free - resources. -</div> - -The current implementation is still pretty low-level in the sense that it only works with -serialized data both for providing the key/namespace and the returned results. It is the -responsibility of the user (or some follow-up utilities) to set up the serializers for this. The -nice thing about this is that the query services don't have to get into the business of worrying -about any class loading issues etc. - -There are some serialization utils for key/namespace and value serialization included in -`KvStateRequestSerializer`. - -### Example - -The following example extends the `CountWindowAverage` example -(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state.html#using-managed-keyed-state)) -by making it queryable and showing how to query this value: - -{% highlight java %} -public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { - - private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum; - - @Override - public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { - Tuple2<Long, Long> currentSum = sum.value(); - currentSum.f0 += 1; - currentSum.f1 += input.f1; - sum.update(currentSum); - - if (currentSum.f0 >= 2) { - out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); - sum.clear(); - } - } - - @Override - public void open(Configuration config) { - ValueStateDescriptor<Tuple2<Long, Long>> descriptor = - new ValueStateDescriptor<>( - "average", // the state name - TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information - Tuple2.of(0L, 0L)); // default value of the state, if nothing was set - descriptor.setQueryable("query-name"); - sum = getRuntimeContext().getState(descriptor); - } -} -{% endhighlight %} - -Once used in a job, you can retrieve the job ID and then query any key's current state from this operator: - -{% highlight java %} -final Configuration config = new Configuration(); -config.setString(JobManagerOptions.ADDRESS, queryAddress); -config.setInteger(JobManagerOptions.PORT, queryPort); - -final HighAvailabilityServices highAvailabilityServices = - HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.newSingleThreadScheduledExecutor(), - HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION); - -QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices); - -final TypeSerializer<Long> keySerializer = - TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig()); -final TypeSerializer<Tuple2<Long, Long>> valueSerializer = - TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig()); - -final byte[] serializedKey = - KvStateRequestSerializer.serializeKeyAndNamespace( - key, keySerializer, - VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); - -Future<byte[]> serializedResult = - client.getKvState(jobId, "query-name", key.hashCode(), serializedKey); - -// now wait for the result and return it -final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS); -byte[] serializedValue = Await.result(serializedResult, duration); -Tuple2<Long, Long> value = - KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer); -{% endhighlight %} - -### Note for Scala Users - -Please use the available Scala extensions when creating the `TypeSerializer` instances. Add the following import: - -```scala -import org.apache.flink.streaming.api.scala._ -``` - -Now you can create the type serializers as follows: - -```scala -val keySerializer = createTypeInformation[Long] - .createSerializer(new ExecutionConfig) -``` - -If you don't do this, you can run into mismatches between the serializers used in the Flink job and in your client code, because types like `scala.Long` cannot be caputured at runtime. - -## Configuration - -The following configuration parameters influence the behaviour of the queryable state server and client. -They are defined in `QueryableStateOptions`. - -### Server -* `query.server.enable`: flag to indicate whether to start the queryable state server -* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick random available port) -* `query.server.network-threads`: number of network (event loop) threads for the `KvStateServer` (0 => #slots) -* `query.server.query-threads`: number of asynchronous query threads for the `KvStateServerHandler` (0 => #slots). - -### Client (`QueryableStateClient`) -* `query.client.network-threads`: number of network (event loop) threads for the `KvStateClient` (0 => number of available cores) -* `query.client.lookup.num-retries`: number of retries on location lookup failures -* `query.client.lookup.retry-delay`: retry delay on location lookup failures (millis) - -## Limitations - -* The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register -queryable state on startup and unregister it on disposal. In future versions, it is desirable to -decouple this in order to allow queries after a task finishes, and to speed up recovery via state -replication. -* Notifications about available KvState happen via a simple tell. In the future this should be improved to be -more robust with asks and acknowledgements. -* The server and client keep track of statistics for queries. These are currently disabled by -default as they would not be exposed anywhere. As soon as there is better support to publish these -numbers via the Metrics system, we should enable the stats. http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state.md ---------------------------------------------------------------------- diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md deleted file mode 100644 index dd61c74..0000000 --- a/docs/dev/stream/state.md +++ /dev/null @@ -1,768 +0,0 @@ ---- -title: "Working with State" -nav-parent_id: streaming -nav-pos: 40 ---- -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -* ToC -{:toc} - -Stateful functions and operators store data across the processing of individual elements/events, making state a critical building block for -any type of more elaborate operation. For example: - - - When an application searches for certain event patterns, the state will store the sequence of events encountered so far. - - When aggregating events per minute, the state holds the pending aggregates. - - When training a machine learning model over a stream of data points, the state holds the current version of the model parameters. - -In order to make state fault tolerant, Flink needs to be aware of the state and [checkpoint](checkpointing.html) it. -In many cases, Flink can also *manage* the state for the application, meaning Flink deals with the memory management (possibly spilling to disk -if necessary) to allow applications to hold very large state. - -This document explains how to use Flink's state abstractions when developing an application. - - -## Keyed State and Operator State - -There are two basic kinds of state in Flink: `Keyed State` and `Operator State`. - -### Keyed State - -*Keyed State* is always relative to keys and can only be used in functions and operators on a `KeyedStream`. - -You can think of Keyed State as Operator State that has been partitioned, -or sharded, with exactly one state-partition per key. -Each keyed-state is logically bound to a unique -composite of <parallel-operator-instance, key>, and since each key -"belongs" to exactly one parallel instance of a keyed operator, we can -think of this simply as <operator, key>. - -Keyed State is further organized into so-called *Key Groups*. Key Groups are the -atomic unit by which Flink can redistribute Keyed State; -there are exactly as many Key Groups as the defined maximum parallelism. -During execution each parallel instance of a keyed operator works with the keys -for one or more Key Groups. - -### Operator State - -With *Operator State* (or *non-keyed state*), each operator state is -bound to one parallel operator instance. -The [Kafka Connector](../connectors/kafka.html) is a good motivating example for the use of Operator State -in Flink. Each parallel instance of the Kafka consumer maintains a map -of topic partitions and offsets as its Operator State. - -The Operator State interfaces support redistributing state among -parallel operator instances when the parallelism is changed. There can be different schemes for doing this redistribution. - -## Raw and Managed State - -*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*. - -*Managed State* is represented in data structures controlled by the Flink runtime, such as internal hash tables, or RocksDB. -Examples are "ValueState", "ListState", etc. Flink's runtime encodes -the states and writes them into the checkpoints. - -*Raw State* is state that operators keep in their own data structures. When checkpointed, they only write a sequence of bytes into -the checkpoint. Flink knows nothing about the state's data structures and sees only the raw bytes. - -All datastream functions can use managed state, but the raw state interfaces can only be used when implementing operators. -Using managed state (rather than raw state) is recommended, since with -managed state Flink is able to automatically redistribute state when the parallelism is -changed, and also do better memory management. - -## Using Managed Keyed State - -The managed keyed state interface provides access to different types of state that are all scoped to -the key of the current input element. This means that this type of state can only be used -on a `KeyedStream`, which can be created via `stream.keyBy(â¦)`. - -Now, we will first look at the different types of state available and then we will see -how they can be used in a program. The available state primitives are: - -* `ValueState<T>`: This keeps a value that can be updated and -retrieved (scoped to key of the input element as mentioned above, so there will possibly be one value -for each key that the operation sees). The value can be set using `update(T)` and retrieved using -`T value()`. - -* `ListState<T>`: This keeps a list of elements. You can append elements and retrieve an `Iterable` -over all currently stored elements. Elements are added using `add(T)`, the Iterable can -be retrieved using `Iterable<T> get()`. - -* `ReducingState<T>`: This keeps a single value that represents the aggregation of all values -added to the state. The interface is the same as for `ListState` but elements added using -`add(T)` are reduced to an aggregate using a specified `ReduceFunction`. - -* `FoldingState<T, ACC>`: This keeps a single value that represents the aggregation of all values -added to the state. Contrary to `ReducingState`, the aggregate type may be different from the type -of elements that are added to the state. The interface is the same as for `ListState` but elements -added using `add(T)` are folded into an aggregate using a specified `FoldFunction`. - -* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and -retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or -`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable -views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively. - -All types of state also have a method `clear()` that clears the state for the currently -active key, i.e. the key of the input element. - -<span class="label label-danger">Attention</span> `FoldingState` will be deprecated in one of -the next versions of Flink and will be completely removed in the future. A more general -alternative will be provided. - -It is important to keep in mind that these state objects are only used for interfacing -with state. The state is not necessarily stored inside but might reside on disk or somewhere else. -The second thing to keep in mind is that the value you get from the state -depends on the key of the input element. So the value you get in one invocation of your -user function can differ from the value in another invocation if the keys involved are different. - -To get a state handle, you have to create a `StateDescriptor`. This holds the name of the state -(as we will see later, you can create several states, and they have to have unique names so -that you can reference them), the type of the values that the state holds, and possibly -a user-specified function, such as a `ReduceFunction`. Depending on what type of state you -want to retrieve, you create either a `ValueStateDescriptor`, a `ListStateDescriptor`, -a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a `MapStateDescriptor`. - -State is accessed using the `RuntimeContext`, so it is only possible in *rich functions*. -Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for -information about that, but we will also see an example shortly. The `RuntimeContext` that -is available in a `RichFunction` has these methods for accessing state: - -* `ValueState<T> getState(ValueStateDescriptor<T>)` -* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)` -* `ListState<T> getListState(ListStateDescriptor<T>)` -* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)` -* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)` - -This is an example `FlatMapFunction` that shows how all of the parts fit together: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { - - /** - * The ValueState handle. The first field is the count, the second field a running sum. - */ - private transient ValueState<Tuple2<Long, Long>> sum; - - @Override - public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { - - // access the state value - Tuple2<Long, Long> currentSum = sum.value(); - - // update the count - currentSum.f0 += 1; - - // add the second field of the input value - currentSum.f1 += input.f1; - - // update the state - sum.update(currentSum); - - // if the count reaches 2, emit the average and clear the state - if (currentSum.f0 >= 2) { - out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); - sum.clear(); - } - } - - @Override - public void open(Configuration config) { - ValueStateDescriptor<Tuple2<Long, Long>> descriptor = - new ValueStateDescriptor<>( - "average", // the state name - TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information - Tuple2.of(0L, 0L)); // default value of the state, if nothing was set - sum = getRuntimeContext().getState(descriptor); - } -} - -// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env) -env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) - .keyBy(0) - .flatMap(new CountWindowAverage()) - .print(); - -// the printed output will be (1,4) and (1,5) -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] { - - private var sum: ValueState[(Long, Long)] = _ - - override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = { - - // access the state value - val tmpCurrentSum = sum.value - - // If it hasn't been used before, it will be null - val currentSum = if (tmpCurrentSum != null) { - tmpCurrentSum - } else { - (0L, 0L) - } - - // update the count - val newSum = (currentSum._1 + 1, currentSum._2 + input._2) - - // update the state - sum.update(newSum) - - // if the count reaches 2, emit the average and clear the state - if (newSum._1 >= 2) { - out.collect((input._1, newSum._2 / newSum._1)) - sum.clear() - } - } - - override def open(parameters: Configuration): Unit = { - sum = getRuntimeContext.getState( - new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)]) - ) - } -} - - -object ExampleCountWindowAverage extends App { - val env = StreamExecutionEnvironment.getExecutionEnvironment - - env.fromCollection(List( - (1L, 3L), - (1L, 5L), - (1L, 7L), - (1L, 4L), - (1L, 2L) - )).keyBy(_._1) - .flatMap(new CountWindowAverage()) - .print() - // the printed output will be (1,4) and (1,5) - - env.execute("ExampleManagedState") -} -{% endhighlight %} -</div> -</div> - -This example implements a poor man's counting window. We key the tuples by the first field -(in the example all have the same key `1`). The function stores the count and a running sum in -a `ValueState`. Once the count reaches 2 it will emit the average and clear the state so that -we start over from `0`. Note that this would keep a different state value for each different input -key if we had tuples with different values in the first field. - -### State in the Scala DataStream API - -In addition to the interface described above, the Scala API has shortcuts for stateful -`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. The user function -gets the current value of the `ValueState` in an `Option` and must return an updated value that -will be used to update the state. - -{% highlight scala %} -val stream: DataStream[(String, Int)] = ... - -val counts: DataStream[(String, Int)] = stream - .keyBy(_._1) - .mapWithState((in: (String, Int), count: Option[Int]) => - count match { - case Some(c) => ( (in._1, c), Some(c + in._2) ) - case None => ( (in._1, 0), Some(in._2) ) - }) -{% endhighlight %} - -## Using Managed Operator State - -To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` -interface, or the `ListCheckpointed<T extends Serializable>` interface. - -#### CheckpointedFunction - -The `CheckpointedFunction` interface provides access to non-keyed state with different -redistribution schemes. It requires the implementation of two methods: - -{% highlight java %} -void snapshotState(FunctionSnapshotContext context) throws Exception; - -void initializeState(FunctionInitializationContext context) throws Exception; -{% endhighlight %} - -Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, -is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not -only the place where different types of state are initialized, but also where state recovery logic is included. - -Currently, list-style managed operator state is supported. The state -is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be redistributed. Depending on the state accessing method, -the following redistribution schemes are defined: - - - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of - all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. - Each operator gets a sublist, which can be empty, or contain one or more elements. - As an example, if with parallelism 1 the checkpointed state of an operator - contains elements `element1` and `element2`, when increasing the parallelism to 2, `element1` may end up in operator instance 0, - while `element2` will go to operator instance 1. - - - **Union redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of - all lists. On restore/redistribution, each operator gets the complete list of state elements. - -Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction` -to buffer elements before sending them to the outside world. It demonstrates -the basic even-split redistribution list state: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -public class BufferingSink - implements SinkFunction<Tuple2<String, Integer>>, - CheckpointedFunction, - CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> { - - private final int threshold; - - private transient ListState<Tuple2<String, Integer>> checkpointedState; - - private List<Tuple2<String, Integer>> bufferedElements; - - public BufferingSink(int threshold) { - this.threshold = threshold; - this.bufferedElements = new ArrayList<>(); - } - - @Override - public void invoke(Tuple2<String, Integer> value) throws Exception { - bufferedElements.add(value); - if (bufferedElements.size() == threshold) { - for (Tuple2<String, Integer> element: bufferedElements) { - // send it to the sink - } - bufferedElements.clear(); - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - checkpointedState.clear(); - for (Tuple2<String, Integer> element : bufferedElements) { - checkpointedState.add(element); - } - } - - @Override - public void initializeState(FunctionInitializationContext context) throws Exception { - ListStateDescriptor<Tuple2<String, Integer>> descriptor = - new ListStateDescriptor<>( - "buffered-elements", - TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); - - checkpointedState = context.getOperatorStateStore().getListState(descriptor); - - if (context.isRestored()) { - for (Tuple2<String, Integer> element : checkpointedState.get()) { - bufferedElements.add(element); - } - } - } - - @Override - public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception { - // this is from the CheckpointedRestoring interface. - this.bufferedElements.addAll(state); - } -} -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -class BufferingSink(threshold: Int = 0) - extends SinkFunction[(String, Int)] - with CheckpointedFunction - with CheckpointedRestoring[List[(String, Int)]] { - - @transient - private var checkpointedState: ListState[(String, Int)] = null - - private val bufferedElements = ListBuffer[(String, Int)]() - - override def invoke(value: (String, Int)): Unit = { - bufferedElements += value - if (bufferedElements.size == threshold) { - for (element <- bufferedElements) { - // send it to the sink - } - bufferedElements.clear() - } - } - - override def snapshotState(context: FunctionSnapshotContext): Unit = { - checkpointedState.clear() - for (element <- bufferedElements) { - checkpointedState.add(element) - } - } - - override def initializeState(context: FunctionInitializationContext): Unit = { - val descriptor = new ListStateDescriptor[(String, Int)]( - "buffered-elements", - TypeInformation.of(new TypeHint[(String, Int)]() {}) - ) - - checkpointedState = context.getOperatorStateStore.getListState(descriptor) - - if(context.isRestored) { - for(element <- checkpointedState.get()) { - bufferedElements += element - } - } - } - - override def restoreState(state: List[(String, Int)]): Unit = { - bufferedElements ++= state - } -} -{% endhighlight %} -</div> -</div> - -The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize -the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects -are going to be stored upon checkpointing. - -Note how the state is initialized, similar to keyed state, -with a `StateDescriptor` that contains the state name and information -about the type of the value that the state holds: - - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -ListStateDescriptor<Tuple2<String, Integer>> descriptor = - new ListStateDescriptor<>( - "buffered-elements", - TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); - -checkpointedState = context.getOperatorStateStore().getListState(descriptor); -{% endhighlight %} - -</div> -<div data-lang="scala" markdown="1"> -{% highlight scala %} - -val descriptor = new ListStateDescriptor[(String, Long)]( - "buffered-elements", - TypeInformation.of(new TypeHint[(String, Long)]() {}) -) - -checkpointedState = context.getOperatorStateStore.getListState(descriptor) - -{% endhighlight %} -</div> -</div> -The naming convention of the state access methods contain its redistribution -pattern followed by its state structure. For example, to use list state with the -union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`. -If the method name does not contain the redistribution pattern, *e.g.* `getListState(descriptor)`, -it simply implies that the basic even-split redistribution scheme will be used. - -After initializing the container, we use the `isRestored()` method of the context to check if we are -recovering after a failure. If this is `true`, *i.e.* we are recovering, the restore logic is applied. - -As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state -initialization is kept in a class variable for future use in `snapshotState()`. There the `ListState` is cleared -of all objects included by the previous checkpoint, and is then filled with the new ones we want to checkpoint. - -As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done -using the provided `FunctionInitializationContext`. - -#### ListCheckpointed - -The `ListCheckpointed` interface is a more limited variant of `CheckpointedFunction`, -which only supports list-style state with even-split redistribution scheme on restore. -It also requires the implementation of two methods: - -{% highlight java %} -List<T> snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List<T> state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -### Stateful Source Functions - -Stateful sources require a bit more care as opposed to other operators. -In order to make the updates to the state and output collection atomic (required for exactly-once semantics -on failure/recovery), the user is required to get a lock from the source's context. - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -public static class CounterSource - extends RichParallelSourceFunction<Long> - implements ListCheckpointed<Long> { - - /** current offset for exactly once semantics */ - private Long offset; - - /** flag for job cancellation */ - private volatile boolean isRunning = true; - - @Override - public void run(SourceContext<Long> ctx) { - final Object lock = ctx.getCheckpointLock(); - - while (isRunning) { - // output and state update are atomic - synchronized (lock) { - ctx.collect(offset); - offset += 1; - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) { - return Collections.singletonList(offset); - } - - @Override - public void restoreState(List<Long> state) { - for (Long s : state) - offset = s; - } -} -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -class CounterSource - extends RichParallelSourceFunction[Long] - with ListCheckpointed[Long] { - - @volatile - private var isRunning = true - - private var offset = 0L - - override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { - val lock = ctx.getCheckpointLock - - while (isRunning) { - // output and state update are atomic - lock.synchronized({ - ctx.collect(offset) - - offset += 1 - }) - } - } - - override def cancel(): Unit = isRunning = false - - override def restoreState(state: util.List[Long]): Unit = - for (s <- state) { - offset = s - } - - override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = - Collections.singletonList(offset) - -} -{% endhighlight %} -</div> -</div> - -Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface. - -## Custom Serialization for Managed State - -This section is targeted as a guideline for users who require the use of custom serialization for their state, covering how -to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using -Flink's own serializers, this section is irrelevant and can be skipped. - -### Using custom serializers - -As demonstrated in the above examples, when registering a managed operator or keyed state, a `StateDescriptor` is required -to specify the state's name, as well as information about the type of the state. The type information is used by Flink's -[type serialization framework](../types_serialization.html) to create appropriate serializers for the state. - -It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, -simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...}; - -ListStateDescriptor<Tuple2<String, Integer>> descriptor = - new ListStateDescriptor<>( - "state-name", - new CustomTypeSerializer()); - -checkpointedState = getRuntimeContext().getListState(descriptor); -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...} - -val descriptor = new ListStateDescriptor[(String, Integer)]( - "state-name", - new CustomTypeSerializer) -) - -checkpointedState = getRuntimeContext.getListState(descriptor); -{% endhighlight %} -</div> -</div> - -Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following -subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using -anonymous classes as your state serializers. Anonymous classes do not have a guarantee on the generated classname, -varying across compilers and depends on the order that they are instantiated within the enclosing class, which can -easily cause the previously written serializer to be unreadable (since the original class can no longer be found in the -classpath). - -### Handling serializer upgrades and compatibility - -Flink allows changing the serializers used to read and write managed state, so that users are not locked in to any -specific serialization. When state is restored, the new serializer registered for the state (i.e., the serializer -that comes with the `StateDescriptor` used to access the state in the restored job) will be checked for compatibility, -and is replaced as the new serializer for the state. - -A compatible serializer would mean that the serializer is capable of reading previous serialized bytes of the state, -and the written binary format of the state also remains identical. The means to check the new serializer's compatibility -is provided through the following two methods of the `TypeSerializer` interface: - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot snapshotConfiguration(); -public abstract CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot); -{% endhighlight %} - -Briefly speaking, every time a checkpoint is performed, the `snapshotConfiguration` method is called to create a -point-in-time view of the state serializer's configuration. The returned configuration snapshot is stored along with the -checkpoint as the state's metadata. When the checkpoint is used to restore a job, that serializer configuration snapshot -will be provided to the _new_ serializer of the same state via the counterpart method, `ensureCompatibility`, to verify -compatibility of the new serializer. This method serves as a check for whether or not the new serializer is compatible, -as well as a hook to possibly reconfigure the new serializer in the case that it is incompatible. - -Note that Flink's own serializers are implemented such that they are at least compatible with themselves, i.e. when the -same serializer is used for the state in the restored job, the serializer's will reconfigure themselves to be compatible -with their previous configuration. - -The following subsections illustrate guidelines to implement these two methods when using custom serializers. - -#### Implementing the `snapshotConfiguration` method - -The serializer's configuration snapshot should capture enough information such that on restore, the information -carried over to the new serializer for the state is sufficient for it to determine whether or not it is compatible. -This could typically contain information about the serializer's parameters or binary format of the serialized data; -generally, anything that allows the new serializer to decide whether or not it can be used to read previous serialized -bytes, and that it writes in the same binary format. - -How the serializer's configuration snapshot is written to and read from checkpoints is fully customizable. The below -is the base class for all serializer configuration snapshot implementations, the `TypeSerializerConfigSnapshot`. - -{% highlight java %} -public abstract TypeSerializerConfigSnapshot extends VersionedIOReadableWritable { - public abstract int getVersion(); - public void read(DataInputView in) {...} - public void write(DataOutputView out) {...} -} -{% endhighlight %} - -The `read` and `write` methods define how the configuration is read from and written to the checkpoint. The base -implementations contain logic to read and write the version of the configuration snapshot, so it should be extended and -not completely overridden. - -The version of the configuration snapshot is determined through the `getVersion` method. Versioning for the serializer -configuration snapshot is the means to maintain compatible configurations, as information included in the configuration -may change over time. By default, configuration snapshots are only compatible with the current version (as returned by -`getVersion`). To indicate that the configuration is compatible with other versions, override the `getCompatibleVersions` -method to return more version values. When reading from the checkpoint, you can use the `getReadVersion` method to -determine the version of the written configuration and adapt the read logic to the specific version. - -<span class="label label-danger">Attention</span> The version of the serializer's configuration snapshot is **not** -related to upgrading the serializer. The exact same serializer can have different implementations of its -configuration snapshot, for example when more information is added to the configuration to allow more comprehensive -compatibility checks in the future. - -One limitation of implementing a `TypeSerializerConfigSnapshot` is that an empty constructor must be present. The empty -constructor is required when reading the configuration snapshot from checkpoints. - -#### Implementing the `ensureCompatibility` method - -The `ensureCompatibility` method should contain logic that performs checks against the information about the previous -serializer carried over via the provided `TypeSerializerConfigSnapshot`, basically doing one of the following: - - * Check whether the serializer is compatible, while possibly reconfiguring itself (if required) so that it may be - compatible. Afterwards, acknowledge with Flink that the serializer is compatible. - - * Acknowledge that the serializer is incompatible and that state migration is required before Flink can proceed with - using the new serializer. - -The above cases can be translated to code by returning one of the following from the `ensureCompatibility` method: - - * **`CompatibilityResult.compatible()`**: This acknowledges that the new serializer is compatible, or has been reconfigured to - be compatible, and Flink can proceed with the job with the serializer as is. - - * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the serializer is incompatible, or cannot be - reconfigured to be compatible, and requires a state migration before the new serializer can be used. State migration - is performed by using the previous serializer to read the restored state bytes to objects, and then serialized again - using the new serializer. - - * **`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`**: This acknowledgement has equivalent semantics - to `CompatibilityResult.requiresMigration()`, but in the case that the previous serializer cannot be found or loaded - to read the restored state bytes for the migration, a provided `TypeDeserializer` can be used as a fallback resort. - -<span class="label label-danger">Attention</span> Currently, as of Flink 1.3, if the result of the compatibility check -acknowledges that state migration needs to be performed, the job simply fails to restore from the checkpoint as state -migration is currently not available. The ability to migrate state will be introduced in future releases. - -### Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in user code - -Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as part of checkpoints along with the state -values, the availability of the classes within the classpath may affect restore behaviour. - -`TypeSerializer`s are directly written into checkpoints using Java Object Serialization. In the case that the new -serializer acknowledges that it is incompatible and requires state migration, it will be required to be present to be -able to read the restored state bytes. Therefore, if the original serializer class no longer exists or has been modified -(resulting in a different `serialVersionUID`) as a result of a serializer upgrade for the state, the restore would -not be able to proceed. The alternative to this requirement is to provide a fallback `TypeDeserializer` when -acknowledging that state migration is required, using `CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`. - -The class of `TypeSerializerConfigSnapshot`s in the restored checkpoint must exist in the classpath, as they are -fundamental components to compatibility checks on upgraded serializers and would not be able to be restored if the class -is not present. Since configuration snapshots are written to checkpoints using custom serialization, the implementation -of the class is free to be changed, as long as compatibility of the configuration change is handled using the versioning -mechanisms in `TypeSerializerConfigSnapshot`.