[FLINK-5456] [docs] Add stub for types of state and state interfaces

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58509531
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58509531
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58509531

Branch: refs/heads/release-1.2
Commit: 585095312a59fee953d6b370db0a939a8392dd19
Parents: ac193d6
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jan 10 12:31:21 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 16 11:53:24 2017 +0100

----------------------------------------------------------------------
 docs/dev/state.md                | 362 ----------------------------------
 docs/dev/stream/checkpointing.md | 152 ++++++++++++++
 docs/dev/stream/state.md         |  78 ++++++++
 docs/internals/state_backends.md |  71 -------
 4 files changed, 230 insertions(+), 433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58509531/docs/dev/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/state.md b/docs/dev/state.md
deleted file mode 100644
index 4478bfc..0000000
--- a/docs/dev/state.md
+++ /dev/null
@@ -1,362 +0,0 @@
----
-title: "State & Checkpointing"
-nav-parent_id: streaming
-nav-id: state
-nav-pos: 40
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-All transformations in Flink may look like functions (in the functional 
processing terminology), but
-are in fact stateful operators. You can make *every* transformation (`map`, 
`filter`, etc) stateful
-by using Flink's state interface or checkpointing instance fields of your 
function. You can register
-any instance field
-as ***managed*** state by implementing an interface. In this case, and also in 
the case of using
-Flink's native state interface, Flink will automatically take consistent 
snapshots of your state
-periodically, and restore its value in the case of a failure.
-
-The end effect is that updates to any form of state are the same under 
failure-free execution and
-execution under failures.
-
-First, we look at how to make instance fields consistent under failures, and 
then we look at
-Flink's state interface.
-
-By default state checkpoints will be stored in-memory at the JobManager. For 
proper persistence of large
-state, Flink supports storing the checkpoints on file systems (HDFS, S3, or 
any mounted POSIX file system),
-which can be configured in the `flink-conf.yaml` or via 
`StreamExecutionEnvironment.setStateBackend(…)`.
-See [state backends]({{ site.baseurl }}/ops/state_backends.html) for 
information
-about the available state backends and how to configure them.
-
-* ToC
-{:toc}
-
-Enabling Checkpointing
--------------------------
-
-Flink has a checkpointing mechanism that recovers streaming jobs after 
failures. The checkpointing mechanism requires a *persistent* (or *durable*) 
source that
-can be asked for prior records again (Apache Kafka is a good example of such a 
source).
-
-The checkpointing mechanism stores the progress in the data sources and data 
sinks, the state of windows, as well as the user-defined state (see [Working 
with State]({{ site.baseurl }}/dev/state.html)) consistently to provide 
*exactly once* processing semantics. Where the checkpoints are stored (e.g., 
JobManager memory, file system, database) depends on the configured [state 
backend]({{ site.baseurl }}/ops/state_backends.html).
-
-The [docs on streaming fault tolerance]({{ site.baseurl 
}}/internals/stream_checkpointing.html) describe in detail the technique behind 
Flink's streaming fault tolerance mechanism.
-
-By default, checkpointing is disabled. To enable checkpointing, call 
`enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the 
checkpoint interval in milliseconds.
-
-Other parameters for checkpointing include:
-
-- *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
-  Exactly-once is preferrable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
-
-- *number of concurrent checkpoints*: By default, the system will not trigger 
another checkpoint while one is still in progress. This ensures that the 
topology does not spend too much time on checkpoints and not make progress with 
processing the streams. It is possible to allow for multiple overlapping 
checkpoints, which is interesting for pipelines that have a certain processing 
delay (for example because the functions call external services that need some 
time to respond) but that still want to do very frequent checkpoints (100s of 
milliseconds) to re-process very little upon failures.
-
-- *checkpoint timeout*: The time after which a checkpoint-in-progress is 
aborted, if it did not complete by then.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000);
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig().setCheckpointTimeout(60000);
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment()
-
-// start a checkpoint every 1000 ms
-env.enableCheckpointing(1000)
-
-// advanced options:
-
-// set mode to exactly-once (this is the default)
-env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
-
-// checkpoints have to complete within one minute, or are discarded
-env.getCheckpointConfig.setCheckpointTimeout(60000)
-
-// allow only one checkpoint to be in progress at the same time
-env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
-{% endhighlight %}
-</div>
-</div>
-
-{% top %}
-
-## Using the Key/Value State Interface
-
-The Key/Value state interface provides access to different types of state that 
are all scoped to
-the key of the current input element. This means that this type of state can 
only be used
-on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
-
-Now, we will first look at the different types of state available and then we 
will see
-how they can be used in a program. The available state primitives are:
-
-* `ValueState<T>`: This keeps a value that can be updated and
-retrieved (scoped to key of the input element, mentioned above, so there will 
possibly be one value
-for each key that the operation sees). The value can be set using `update(T)` 
and retrieved using
-`T value()`.
-
-* `ListState<T>`: This keeps a list of elements. You can append elements and 
retrieve an `Iterable`
-over all currently stored elements. Elements are added using `add(T)`, the 
Iterable can
-be retrieved using `Iterable<T> get()`.
-
-* `ReducingState<T>`: This keeps a single value that represents the 
aggregation of all values
-added to the state. The interface is the same as for `ListState` but elements 
added using
-`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
-
-All types of state also have a method `clear()` that clears the state for the 
currently
-active key (i.e. the key of the input element).
-
-It is important to keep in mind that these state objects are only used for 
interfacing
-with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
-The second thing to keep in mind is that the value you get from the state
-depends on the key of the input element. So the value you get in one 
invocation of your
-user function can differ from the value in another invocation if the keys 
involved are different.
-
-To get a state handle you have to create a `StateDescriptor`. This holds the 
name of the state
-(as we will later see you can create several states, and they have to have 
unique names so
-that you can reference them), the type of the values that the state holds, and 
possibly
-a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
-want to retrieve, you create either a `ValueStateDescriptor`, a 
`ListStateDescriptor` or
-a `ReducingStateDescriptor`.
-
-State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
-Please see [here]({{ site.baseurl }}/dev/api_concepts#rich-functions) for
-information about that, but we will also see an example shortly. The 
`RuntimeContext` that
-is available in a `RichFunction` has these methods for accessing state:
-
-* `ValueState<T> getState(ValueStateDescriptor<T>)`
-* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
-* `ListState<T> getListState(ListStateDescriptor<T>)`
-
-This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
-
-{% highlight java %}
-public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
-
-    /**
-     * The ValueState handle. The first field is the count, the second field a 
running sum.
-     */
-    private transient ValueState<Tuple2<Long, Long>> sum;
-
-    @Override
-    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
-
-        // access the state value
-        Tuple2<Long, Long> currentSum = sum.value();
-
-        // update the count
-        currentSum.f0 += 1;
-
-        // add the second field of the input value
-        currentSum.f1 += input.f1;
-
-        // update the state
-        sum.update(currentSum);
-
-        // if the count reaches 2, emit the average and clear the state
-        if (currentSum.f0 >= 2) {
-            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
-            sum.clear();
-        }
-    }
-
-    @Override
-    public void open(Configuration config) {
-        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
-                new ValueStateDescriptor<>(
-                        "average", // the state name
-                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}), // type information
-                        Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
-        sum = getRuntimeContext().getState(descriptor);
-    }
-}
-
-// this can be used in a streaming program like this (assuming we have a 
StreamExecutionEnvironment env)
-env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), 
Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
-        .keyBy(0)
-        .flatMap(new CountWindowAverage())
-        .print();
-
-// the printed output will be (1,4) and (1,5)
-{% endhighlight %}
-
-This example implements a poor man's counting window. We key the tuples by the 
first field
-(in the example all have the same key `1`). The function stores the count and 
a running sum in
-a `ValueState`. Once the count reaches 2 it will emit the average and clear 
the state so that
-we start over from `0`. Note that this would keep a different state value for 
each different input
-key if we had tuples with different values in the first field.
-
-### State in the Scala DataStream API
-
-In addition to the interface described above, the Scala API has shortcuts for 
stateful
-`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. 
The user function
-gets the current value of the `ValueState` in an `Option` and must return an 
updated value that
-will be used to update the state.
-
-{% highlight scala %}
-val stream: DataStream[(String, Int)] = ...
-
-val counts: DataStream[(String, Int)] = stream
-  .keyBy(_._1)
-  .mapWithState((in: (String, Int), count: Option[Int]) =>
-    count match {
-      case Some(c) => ( (in._1, c), Some(c + in._2) )
-      case None => ( (in._1, 0), Some(in._2) )
-    })
-{% endhighlight %}
-
-## Checkpointing Instance Fields
-
-Instance fields can be checkpointed by using the `Checkpointed` interface.
-
-When the user-defined function implements the `Checkpointed` interface, the 
`snapshotState(…)` and `restoreState(…)`
-methods will be executed to draw and restore function state.
-
-In addition to that, user functions can also implement the 
`CheckpointListener` interface to receive notifications on
-completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` 
method.
-Note that there is no guarantee for the user function to receive a 
notification if a failure happens between
-checkpoint completion and notification. The notifications should hence be 
treated in a way that notifications from
-later checkpoints can subsume missing notifications.
-
-The above example for `ValueState` can be implemented using instance fields 
like this:
-
-{% highlight java %}
-
-public class CountWindowAverage
-        extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>
-        implements Checkpointed<Tuple2<Long, Long>> {
-
-    private Tuple2<Long, Long> sum = null;
-
-    @Override
-    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
-
-        // update the count
-        sum.f0 += 1;
-
-        // add the second field of the input value
-        sum.f1 += input.f1;
-
-
-        // if the count reaches 2, emit the average and clear the state
-        if (sum.f0 >= 2) {
-            out.collect(new Tuple2<>(input.f0, sum.f1 / sum.f0));
-            sum = Tuple2.of(0L, 0L);
-        }
-    }
-
-    @Override
-    public void open(Configuration config) {
-        if (sum == null) {
-            // only recreate if null
-            // restoreState will be called before open()
-            // so this will already set the sum to the restored value
-            sum = Tuple2.of(0L, 0L);
-        }
-    }
-
-    // regularly persists state during normal operation
-    @Override
-    public Serializable snapshotState(long checkpointId, long 
checkpointTimestamp) {
-        return sum;
-    }
-
-    // restores state on recovery from failure
-    @Override
-    public void restoreState(Tuple2<Long, Long> state) {
-        sum = state;
-    }
-}
-{% endhighlight %}
-
-## Stateful Source Functions
-
-Stateful sources require a bit more care as opposed to other operators.
-In order to make the updates to the state and output collection atomic 
(required for exactly-once semantics
-on failure/recovery), the user is required to get a lock from the source's 
context.
-
-{% highlight java %}
-public static class CounterSource
-        extends RichParallelSourceFunction<Long>
-        implements Checkpointed<Long> {
-
-    /**  current offset for exactly once semantics */
-    private long offset;
-
-    /** flag for job cancellation */
-    private volatile boolean isRunning = true;
-
-    @Override
-    public void run(SourceContext<Long> ctx) {
-        final Object lock = ctx.getCheckpointLock();
-
-        while (isRunning) {
-            // output and state update are atomic
-            synchronized (lock) {
-                ctx.collect(offset);
-                offset += 1;
-            }
-        }
-    }
-
-    @Override
-    public void cancel() {
-        isRunning = false;
-    }
-
-    @Override
-    public Long snapshotState(long checkpointId, long checkpointTimestamp) {
-        return offset;
-
-    }
-
-    @Override
-       public void restoreState(Long state) {
-        offset = state;
-    }
-}
-{% endhighlight %}
-
-Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
-
-## State Checkpoints in Iterative Jobs
-
-Flink currently only provides processing guarantees for jobs without 
iterations. Enabling checkpointing on an iterative job causes an exception. In 
order to force checkpointing on an iterative program the user needs to set a 
special flag when enabling checkpointing: `env.enableCheckpointing(interval, 
force = true)`.
-
-Please note that records in flight in the loop edges (and the state changes 
associated with them) will be lost during failure.
-
-{% top %}
-
-## Restart Strategies
-
-Flink supports different restart strategies which control how the jobs are 
restarted in case of a failure. For more 
-information, see [Restart Strategies]({{ site.baseurl 
}}/dev/restart_strategies.html).

http://git-wip-us.apache.org/repos/asf/flink/blob/58509531/docs/dev/stream/checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/checkpointing.md b/docs/dev/stream/checkpointing.md
new file mode 100644
index 0000000..774d9ef
--- /dev/null
+++ b/docs/dev/stream/checkpointing.md
@@ -0,0 +1,152 @@
+---
+title: "Checkpointing"
+nav-parent_id: streaming
+nav-pos: 50
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* ToC
+{:toc}
+
+Every function and operator in Flink can be **stateful** (see [working with 
state](state.html) for details).
+Stateful functions store data across the processing of individual 
elements/events, making state a critical building block for
+any type of more elaborate operation.
+
+In order to make state fault tolerant, Flink needs to **checkpoint** the 
state. Checkpoints allow Flink to recover state and positions
+in the streams to give the application the same semantics as a failure-free 
execution.
+
+The [documentation on streaming fault 
tolerance](../../internals/stream_checkpointing.html) describe in detail the 
technique behind Flink's streaming fault tolerance mechanism.
+
+
+## Prerequisites
+
+Flink's checkpointing mechanism interacts with durable storage for streams and 
state. In general, it requires:
+
+  - A *persistent* (or *durable*) data source that can replay records for a 
certain amount of time. Examples for such sources are persistent messages 
queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file 
systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...).
+  - A persistent storage for state, typically a distributed filesystem (e.g., 
HDFS, S3, GFS, NFS, Ceph, ...)
+
+
+## Enabling and Configuring Checkpointing
+
+By default, checkpointing is disabled. To enable checkpointing, call 
`enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the 
checkpoint interval in milliseconds.
+
+Other parameters for checkpointing include:
+
+  - *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
+    Exactly-once is preferrable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
+
+  - *checkpoint timeout*: The time after which a checkpoint-in-progress is 
aborted, if it did not complete by then.
+
+  - *minimum time between checkpoints*: To make sure that the streaming 
application makes a certain amount of progress between checkpoints,
+    one can define how much time needs to pass between checkpoints. If this 
value is set for example to *5000*, the next checkpoint will be
+    started no sooner than 5 seconds after the previous checkpoint completed, 
regardless of the checkpoint duration and the checkpoint interval.
+    Note that this implies that the checkpoint interval will never be smaller 
than this parameter.
+    
+    It is often easier to configure applications by defining the "time between 
checkpoints" then the checkpoint interval, because the "time between 
checkpoints"
+    is not susceptible to the fact that checkpoints may sometimes take longer 
than on average (for example if the target storage system is temporarily slow).
+
+    Note that this value also implies that the number of concurrent 
checkpoints is *one*.
+
+  - *number of concurrent checkpoints*: By default, the system will not 
trigger another checkpoint while one is still in progress.
+    This ensures that the topology does not spend too much time on checkpoints 
and not make progress with processing the streams.
+    It is possible to allow for multiple overlapping checkpoints, which is 
interesting for pipelines that have a certain processing delay
+    (for example because the functions call external services that need some 
time to respond) but that still want to do very frequent checkpoints
+    (100s of milliseconds) to re-process very little upon failures.
+
+    This option cannot be used when a minimum time between checkpoints is 
defined.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000);
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+// make sure 500 ms of progress happen between checkpoints
+env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig().setCheckpointTimeout(60000);
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000)
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+// make sure 500 ms of progress happen between checkpoints
+env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig.setCheckpointTimeout(60000)
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+
+## Selecting a State Backend
+
+The checkpointing mechanism stores the progress in the data sources and data 
sinks, the state of windows, as well as the [user-defined state](state.html) 
consistently to
+provide *exactly once* processing semantics. Where the checkpoints are stored 
(e.g., JobManager memory, file system, database) depends on the configured
+**State Backend**. 
+
+By default state will be kept in memory, and checkpoints will be stored 
in-memory at the master node (the JobManager). For proper persistence of large 
state,
+Flink supports various forms of storing and checkpointing state in so called 
**State Backends**, which can be set via 
`StreamExecutionEnvironment.setStateBackend(…)`.
+
+See [state backends](../../ops/state_backends.html) for more details on the 
available state backends and options for job-wide and cluster-wide 
configuration.
+
+
+## State Checkpoints in Iterative Jobs
+
+Flink currently only provides processing guarantees for jobs without 
iterations. Enabling checkpointing on an iterative job causes an exception. In 
order to force checkpointing on an iterative program the user needs to set a 
special flag when enabling checkpointing: `env.enableCheckpointing(interval, 
force = true)`.
+
+Please note that records in flight in the loop edges (and the state changes 
associated with them) will be lost during failure.
+
+{% top %}
+
+
+## Restart Strategies
+
+Flink supports different restart strategies which control how the jobs are 
restarted in case of a failure. For more 
+information, see [Restart Strategies]({{ site.baseurl 
}}/dev/restart_strategies.html).
+
+{% top %}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/58509531/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
new file mode 100644
index 0000000..0b38a62
--- /dev/null
+++ b/docs/dev/stream/state.md
@@ -0,0 +1,78 @@
+---
+title: "Working with State"
+nav-parent_id: streaming
+nav-pos: 40
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* ToC
+{:toc}
+
+Stateful functions and operators store data across the processing of 
individual elements/events, making state a critical building block for
+any type of more elaborate operation. For example: 
+
+  - When an application searches for certain event patterns, the state will 
store the sequence of events encountered so far.
+  - When aggregating events per minute, the state holds the pending aggregates.
+  - When training a machine learning model over a stream of data points, the 
state holds the current verstion of the model parameters.
+
+In order to make state fault tolerant, Flink needs to be aware of the state 
and [checkpoint](checkpointing.html) it.
+In many cases, Flink can also *manage* the state for the application, meaning 
Flink deals with the memory management (possibly spilling to disk
+if necessary) to allow applications to hold very large state.
+
+This document explains how to use Flink's state abstractions when developing 
an application.
+
+
+## Keyed State and Operator state
+
+There are two basic state backends: `Keyed State` and `Operator State`.
+
+#### Keyed State
+
+*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+Examples of keyed state are the `ValueState` or `ListState` that one can 
create in a function on a `KeyedStream`, as
+well as the state of a keyed window operator.
+
+Keyed State is organized in so called *Key Groups*. Key Groups are the unit by 
which keyed state can be redistributed and
+there are as many key groups as the defined maximum parallelism.
+During execution each parallel instance of an operator gets one or more key 
groups.
+
+#### Operator State
+
+*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` 
interface in Flink 1.0 and Flink 1.1.
+The new `CheckpointedFunction` interface is basically a shortcut (syntactic 
sugar) for the Operator State.
+
+Operator State needs special re-distribution schemes when parallelism is 
changed. There can be different variations of such
+schemes; the following are currently defined:
+
+  - **List-style redistribution:** Each operator returns a List of state 
elements. The whole state is logically a concatenation of
+    all lists. On restore/redistribution, the list is evenly divided into as 
many sublists as there are parallel operators.
+    Each operator gets a sublist, which can be empty, or contain one or more 
elements.
+
+
+## Raw and Managed State
+
+*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
+
+*Managed State* is represented in data structures controlled by the Flink 
runtime, such as internal hash tables, or RocksDB.
+Examples are "ValueState", "ListState", etc. Flink's runtime encodes the 
states and writes them into the checkpoints.
+
+*Raw State* is state that users and operators keep in their own data 
structures. When checkpointed, they only write a sequence of bytes into
+the checkpoint. Flink knows nothing about the state's data structures and sees 
only the raw bytes.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/58509531/docs/internals/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/internals/state_backends.md b/docs/internals/state_backends.md
deleted file mode 100644
index 11d46ed..0000000
--- a/docs/internals/state_backends.md
+++ /dev/null
@@ -1,71 +0,0 @@
----
-title:  "State and State Backends"
-nav-title: State Backends
-nav-parent_id: internals
-nav-pos: 4
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-**NOTE** This document is only a sketch of some bullet points, to be fleshed 
out.
-
-**NOTE** The structure of State Backends changed heavily between version 1.1 
and 1.2. This documentation is only applicable
-to Apache Flink version 1.2 and later.
-
-
-## Keyed State and Operator state
-
-There are two basic state backends: `Keyed State` and `Operator State`.
-
-#### Keyed State
-
-*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
-Examples of keyed state are the `ValueState` or `ListState` that one can 
create in a function on a `KeyedStream`, as
-well as the state of a keyed window operator.
-
-Keyed State is organized in so called *Key Groups*. Key Groups are the unit by 
which keyed state can be redistributed and
-there are as many key groups as the defined maximum parallelism.
-During execution each parallel instance of an operator gets one or more key 
groups.
-
-#### Operator State
-
-*Operator State* is state per parallel subtask. It subsumes the `Checkpointed` 
interface in Flink 1.0 and Flink 1.1.
-The new `CheckpointedFunction` interface is basically a shortcut (syntactic 
sugar) for the Operator State.
-
-Operator State needs special re-distribution schemes when parallelism is 
changed. There can be different variations of such
-schemes; the following are currently defined:
-
-  - **List-style redistribution:** Each operator returns a List of state 
elements. The whole state is logically a concatenation of
-    all lists. On restore/redistribution, the list is evenly divided into as 
many sublists as there are parallel operators.
-    Each operator gets a sublist, which can be empty, or contain one or more 
elements.
-
-
-## Raw and Managed State
-
-*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
-
-*Managed State* is represented in data structures controlled by the Flink 
runtime, such as internal hash tables, or RocksDB.
-Examples are "ValueState", "ListState", etc. Flink's runtime encodes the 
states and writes them into the checkpoints.
-
-*Raw State* is state that users and operators keep in their own data 
structures. When checkpointed, they only write a sequence of bytes into
-the checkpoint. Flink knows nothing about the state's data structures and sees 
only the raw bytes.
-

Reply via email to