http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/checkpointing.md 
b/docs/dev/stream/state/checkpointing.md
new file mode 100644
index 0000000..34c16b0
--- /dev/null
+++ b/docs/dev/stream/state/checkpointing.md
@@ -0,0 +1,174 @@
+---
+title: "Checkpointing"
+nav-parent_id: streaming_state
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* ToC
+{:toc}
+
+Every function and operator in Flink can be **stateful** (see [working with 
state](state.html) for details).
+Stateful functions store data across the processing of individual 
elements/events, making state a critical building block for
+any type of more elaborate operation.
+
+In order to make state fault tolerant, Flink needs to **checkpoint** the 
state. Checkpoints allow Flink to recover state and positions
+in the streams to give the application the same semantics as a failure-free 
execution.
+
+The [documentation on streaming fault 
tolerance](../../../internals/stream_checkpointing.html) describes in detail 
the technique behind Flink's streaming fault tolerance mechanism.
+
+
+## Prerequisites
+
+Flink's checkpointing mechanism interacts with durable storage for streams and 
state. In general, it requires:
+
+  - A *persistent* (or *durable*) data source that can replay records for a 
certain amount of time. Examples for such sources are persistent messages 
queues (e.g., Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub) or file 
systems (e.g., HDFS, S3, GFS, NFS, Ceph, ...).
+  - A persistent storage for state, typically a distributed filesystem (e.g., 
HDFS, S3, GFS, NFS, Ceph, ...)
+
+
+## Enabling and Configuring Checkpointing
+
+By default, checkpointing is disabled. To enable checkpointing, call 
`enableCheckpointing(n)` on the `StreamExecutionEnvironment`, where *n* is the 
checkpoint interval in milliseconds.
+
+Other parameters for checkpointing include:
+
+  - *exactly-once vs. at-least-once*: You can optionally pass a mode to the 
`enableCheckpointing(n)` method to choose between the two guarantee levels.
+    Exactly-once is preferrable for most applications. At-least-once may be 
relevant for certain super-low-latency (consistently few milliseconds) 
applications.
+
+  - *checkpoint timeout*: The time after which a checkpoint-in-progress is 
aborted, if it did not complete by then.
+
+  - *minimum time between checkpoints*: To make sure that the streaming 
application makes a certain amount of progress between checkpoints,
+    one can define how much time needs to pass between checkpoints. If this 
value is set for example to *5000*, the next checkpoint will be
+    started no sooner than 5 seconds after the previous checkpoint completed, 
regardless of the checkpoint duration and the checkpoint interval.
+    Note that this implies that the checkpoint interval will never be smaller 
than this parameter.
+    
+    It is often easier to configure applications by defining the "time between 
checkpoints" then the checkpoint interval, because the "time between 
checkpoints"
+    is not susceptible to the fact that checkpoints may sometimes take longer 
than on average (for example if the target storage system is temporarily slow).
+
+    Note that this value also implies that the number of concurrent 
checkpoints is *one*.
+
+  - *number of concurrent checkpoints*: By default, the system will not 
trigger another checkpoint while one is still in progress.
+    This ensures that the topology does not spend too much time on checkpoints 
and not make progress with processing the streams.
+    It is possible to allow for multiple overlapping checkpoints, which is 
interesting for pipelines that have a certain processing delay
+    (for example because the functions call external services that need some 
time to respond) but that still want to do very frequent checkpoints
+    (100s of milliseconds) to re-process very little upon failures.
+
+    This option cannot be used when a minimum time between checkpoints is 
defined.
+
+  - *externalized checkpoints*: You can configure periodic checkpoints to be 
persisted externally. Externalized checkpoints write their meta data out to 
persistent storage and are *not* automatically cleaned up when the job fails. 
This way, you will have a checkpoint around to resume from if your job fails. 
There are more details in the [deployment notes on externalized 
checkpoints](../../ops/state/checkpoints.html#externalized-checkpoints).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000);
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+
+// make sure 500 ms of progress happen between checkpoints
+env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig().setCheckpointTimeout(60000);
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
+
+// enable externalized checkpoints which are retained after job cancellation
+env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// start a checkpoint every 1000 ms
+env.enableCheckpointing(1000)
+
+// advanced options:
+
+// set mode to exactly-once (this is the default)
+env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+
+// make sure 500 ms of progress happen between checkpoints
+env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
+
+// checkpoints have to complete within one minute, or are discarded
+env.getCheckpointConfig.setCheckpointTimeout(60000)
+
+// allow only one checkpoint to be in progress at the same time
+env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
+{% endhighlight %}
+</div>
+</div>
+
+### Related Config Options
+
+Some more parameters and/or defaults may be set via `conf/flink-conf.yaml` 
(see [configuration]({{ site.baseurl }}/ops/config.html) for a full guide):
+
+- `state.backend`: The backend that will be used to store operator state 
checkpoints if checkpointing is enabled. Supported backends:
+   -  `jobmanager`: In-memory state, backup to JobManager's/ZooKeeper's 
memory. Should be used only for minimal state (Kafka offsets) or testing and 
local debugging.
+   -  `filesystem`: State is in-memory on the TaskManagers, and state 
snapshots are stored in a file system. Supported are all filesystems supported 
by Flink, for example HDFS, S3, ...
+
+- `state.backend.fs.checkpointdir`: Directory for storing checkpoints in a 
Flink supported filesystem. Note: State backend must be accessible from the 
JobManager, use `file://` only for local setups.
+
+- `state.backend.rocksdb.checkpointdir`:  The local directory for storing 
RocksDB files, or a list of directories separated by the systems directory 
delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is 
`taskmanager.tmp.dirs`)
+
+- `state.checkpoints.dir`: The target directory for meta data of [externalized 
checkpoints]({{ site.baseurl 
}}/ops/state/checkpoints.html#externalized-checkpoints).
+
+- `state.checkpoints.num-retained`: The number of completed checkpoint 
instances to retain. Having more than one allows recovery fallback to an 
earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)
+
+{% top %}
+
+
+## Selecting a State Backend
+
+Flink's [checkpointing mechanism]({{ site.baseurl 
}}/internals/stream_checkpointing.html) stores consistent snapshots
+of all the state in timers and stateful operators, including connectors, 
windows, and any [user-defined state](state.html).
+Where the checkpoints are stored (e.g., JobManager memory, file system, 
database) depends on the configured
+**State Backend**. 
+
+By default, state is kept in memory in the TaskManagers and checkpoints are 
stored in memory in the JobManager. For proper persistence of large state,
+Flink supports various approaches for storing and checkpointing state in other 
state backends. The choice of state backend can be configured via 
`StreamExecutionEnvironment.setStateBackend(…)`.
+
+See [state backends]({{ site.baseurl }}/ops/state/state_backends.html) for 
more details on the available state backends and options for job-wide and 
cluster-wide configuration.
+
+
+## State Checkpoints in Iterative Jobs
+
+Flink currently only provides processing guarantees for jobs without 
iterations. Enabling checkpointing on an iterative job causes an exception. In 
order to force checkpointing on an iterative program the user needs to set a 
special flag when enabling checkpointing: `env.enableCheckpointing(interval, 
force = true)`.
+
+Please note that records in flight in the loop edges (and the state changes 
associated with them) will be lost during failure.
+
+{% top %}
+
+
+## Restart Strategies
+
+Flink supports different restart strategies which control how the jobs are 
restarted in case of a failure. For more 
+information, see [Restart Strategies]({{ site.baseurl 
}}/dev/restart_strategies.html).
+
+{% top %}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/custom_serialization.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/custom_serialization.md 
b/docs/dev/stream/state/custom_serialization.md
new file mode 100644
index 0000000..fbb7b83
--- /dev/null
+++ b/docs/dev/stream/state/custom_serialization.md
@@ -0,0 +1,188 @@
+---
+title: "Custom Serialization for Managed State"
+nav-title: "Custom Serialization"
+nav-parent_id: streaming_state
+nav-pos: 10
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+If your application uses Flink's managed state, it might be necessary to 
implement custom serialization logic for special use cases.
+
+This page is targeted as a guideline for users who require the use of custom 
serialization for their state, covering how
+to provide a custom serializer and how to handle upgrades to the serializer 
for compatibility. If you're simply using
+Flink's own serializers, this page is irrelevant and can be skipped.
+
+### Using custom serializers
+
+As demonstrated in the above examples, when registering a managed operator or 
keyed state, a `StateDescriptor` is required
+to specify the state's name, as well as information about the type of the 
state. The type information is used by Flink's
+[type serialization framework](../../types_serialization.html) to create 
appropriate serializers for the state.
+
+It is also possible to completely bypass this and let Flink use your own 
custom serializer to serialize managed states,
+simply by directly instantiating the `StateDescriptor` with your own 
`TypeSerializer` implementation:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, 
Integer>> {...};
+
+ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+    new ListStateDescriptor<>(
+        "state-name",
+        new CustomTypeSerializer());
+
+checkpointedState = getRuntimeContext().getListState(descriptor);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
+
+val descriptor = new ListStateDescriptor[(String, Integer)](
+    "state-name",
+    new CustomTypeSerializer)
+)
+
+checkpointedState = getRuntimeContext.getListState(descriptor);
+{% endhighlight %}
+</div>
+</div>
+
+Note that Flink writes state serializers along with the state as metadata. In 
certain cases on restore (see following
+subsections), the written serializer needs to be deserialized and used. 
Therefore, it is recommended to avoid using
+anonymous classes as your state serializers. Anonymous classes do not have a 
guarantee on the generated classname,
+which varies across compilers and depends on the order that they are 
instantiated within the enclosing class, which can 
+easily cause the previously written serializer to be unreadable (since the 
original class can no longer be found in the
+classpath).
+
+### Handling serializer upgrades and compatibility
+
+Flink allows changing the serializers used to read and write managed state, so 
that users are not locked in to any
+specific serialization. When state is restored, the new serializer registered 
for the state (i.e., the serializer
+that comes with the `StateDescriptor` used to access the state in the restored 
job) will be checked for compatibility,
+and is replaced as the new serializer for the state.
+
+A compatible serializer would mean that the serializer is capable of reading 
previous serialized bytes of the state,
+and the written binary format of the state also remains identical. The means 
to check the new serializer's compatibility
+is provided through the following two methods of the `TypeSerializer` 
interface:
+
+{% highlight java %}
+public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
+public abstract CompatibilityResult 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
+{% endhighlight %}
+
+Briefly speaking, every time a checkpoint is performed, the 
`snapshotConfiguration` method is called to create a
+point-in-time view of the state serializer's configuration. The returned 
configuration snapshot is stored along with the
+checkpoint as the state's metadata. When the checkpoint is used to restore a 
job, that serializer configuration snapshot
+will be provided to the _new_ serializer of the same state via the counterpart 
method, `ensureCompatibility`, to verify
+compatibility of the new serializer. This method serves as a check for whether 
or not the new serializer is compatible,
+as well as a hook to possibly reconfigure the new serializer in the case that 
it is incompatible.
+
+Note that Flink's own serializers are implemented such that they are at least 
compatible with themselves, i.e. when the
+same serializer is used for the state in the restored job, the serializer's 
will reconfigure themselves to be compatible
+with their previous configuration.
+
+The following subsections illustrate guidelines to implement these two methods 
when using custom serializers.
+
+#### Implementing the `snapshotConfiguration` method
+
+The serializer's configuration snapshot should capture enough information such 
that on restore, the information
+carried over to the new serializer for the state is sufficient for it to 
determine whether or not it is compatible.
+This could typically contain information about the serializer's parameters or 
binary format of the serialized data;
+generally, anything that allows the new serializer to decide whether or not it 
can be used to read previous serialized
+bytes, and that it writes in the same binary format.
+
+How the serializer's configuration snapshot is written to and read from 
checkpoints is fully customizable. The below
+is the base class for all serializer configuration snapshot implementations, 
the `TypeSerializerConfigSnapshot`.
+
+{% highlight java %}
+public abstract TypeSerializerConfigSnapshot extends 
VersionedIOReadableWritable {
+  public abstract int getVersion();
+  public void read(DataInputView in) {...}
+  public void write(DataOutputView out) {...}
+}
+{% endhighlight %}
+
+The `read` and `write` methods define how the configuration is read from and 
written to the checkpoint. The base
+implementations contain logic to read and write the version of the 
configuration snapshot, so it should be extended and
+not completely overridden.
+
+The version of the configuration snapshot is determined through the 
`getVersion` method. Versioning for the serializer
+configuration snapshot is the means to maintain compatible configurations, as 
information included in the configuration
+may change over time. By default, configuration snapshots are only compatible 
with the current version (as returned by
+`getVersion`). To indicate that the configuration is compatible with other 
versions, override the `getCompatibleVersions`
+method to return more version values. When reading from the checkpoint, you 
can use the `getReadVersion` method to
+determine the version of the written configuration and adapt the read logic to 
the specific version.
+
+<span class="label label-danger">Attention</span> The version of the 
serializer's configuration snapshot is **not**
+related to upgrading the serializer. The exact same serializer can have 
different implementations of its
+configuration snapshot, for example when more information is added to the 
configuration to allow more comprehensive
+compatibility checks in the future.
+
+One limitation of implementing a `TypeSerializerConfigSnapshot` is that an 
empty constructor must be present. The empty
+constructor is required when reading the configuration snapshot from 
checkpoints.
+
+#### Implementing the `ensureCompatibility` method
+
+The `ensureCompatibility` method should contain logic that performs checks 
against the information about the previous
+serializer carried over via the provided `TypeSerializerConfigSnapshot`, 
basically doing one of the following:
+
+  * Check whether the serializer is compatible, while possibly reconfiguring 
itself (if required) so that it may be
+    compatible. Afterwards, acknowledge with Flink that the serializer is 
compatible.
+
+  * Acknowledge that the serializer is incompatible and that state migration 
is required before Flink can proceed with
+    using the new serializer.
+
+The above cases can be translated to code by returning one of the following 
from the `ensureCompatibility` method:
+
+  * **`CompatibilityResult.compatible()`**: This acknowledges that the new 
serializer is compatible, or has been reconfigured to
+    be compatible, and Flink can proceed with the job with the serializer as 
is.
+
+  * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the 
serializer is incompatible, or cannot be
+    reconfigured to be compatible, and requires a state migration before the 
new serializer can be used. State migration
+    is performed by using the previous serializer to read the restored state 
bytes to objects, and then serialized again
+    using the new serializer.
+
+  * **`CompatibilityResult.requiresMigration(TypeDeserializer 
deserializer)`**: This acknowledgement has equivalent semantics
+    to `CompatibilityResult.requiresMigration()`, but in the case that the 
previous serializer cannot be found or loaded
+    to read the restored state bytes for the migration, a provided 
`TypeDeserializer` can be used as a fallback resort.
+
+<span class="label label-danger">Attention</span> Currently, as of Flink 1.3, 
if the result of the compatibility check
+acknowledges that state migration needs to be performed, the job simply fails 
to restore from the checkpoint as state
+migration is currently not available. The ability to migrate state will be 
introduced in future releases.
+
+### Managing `TypeSerializer` and `TypeSerializerConfigSnapshot` classes in 
user code
+
+Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as 
part of checkpoints along with the state
+values, the availability of the classes within the classpath may affect 
restore behaviour.
+
+`TypeSerializer`s are directly written into checkpoints using Java Object 
Serialization. In the case that the new
+serializer acknowledges that it is incompatible and requires state migration, 
it will be required to be present to be
+able to read the restored state bytes. Therefore, if the original serializer 
class no longer exists or has been modified
+(resulting in a different `serialVersionUID`) as a result of a serializer 
upgrade for the state, the restore would
+not be able to proceed. The alternative to this requirement is to provide a 
fallback `TypeDeserializer` when
+acknowledging that state migration is required, using 
`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`.
+
+The class of `TypeSerializerConfigSnapshot`s in the restored checkpoint must 
exist in the classpath, as they are
+fundamental components to compatibility checks on upgraded serializers and 
would not be able to be restored if the class
+is not present. Since configuration snapshots are written to checkpoints using 
custom serialization, the implementation
+of the class is free to be changed, as long as compatibility of the 
configuration change is handled using the versioning
+mechanisms in `TypeSerializerConfigSnapshot`.

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/index.md b/docs/dev/stream/state/index.md
new file mode 100644
index 0000000..d1c0edb
--- /dev/null
+++ b/docs/dev/stream/state/index.md
@@ -0,0 +1,56 @@
+ ---
+title: "State & Fault Tolerance"
+nav-id: streaming_state
+nav-title: "State & Fault Tolerance"
+nav-parent_id: streaming
+nav-pos: 3
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Stateful functions and operators store data across the processing of 
individual elements/events, making state a critical building block for
+any type of more elaborate operation.
+
+For example:
+
+  - When an application searches for certain event patterns, the state will 
store the sequence of events encountered so far.
+  - When aggregating events per minute/hour/day, the state holds the pending 
aggregates.
+  - When training a machine learning model over a stream of data points, the 
state holds the current version of the model parameters.
+  - When historic data needs to be managed, the state allows efficient access 
to events occured in the past. 
+
+Flink needs to be aware of the state in order to make state fault tolerant 
using [checkpoints](checkpointing.html) and to allow [savepoints]({{ 
site.baseurl }}/ops/state/savepoints.html) of streaming applications.
+
+Knowledge about the state also allows for rescaling Flink applications, 
meaning that Flink takes care of redistributing state across parallel instances.
+
+The [queryable state](queryable_state.html) feature of Flink allows you to 
access state from outside of Flink during runtime.
+
+When working with state, it might also be useful to read about [Flink's state 
backends]({{ site.baseurl }}/ops/state/state_backends.html). Flink provides 
different state backends that specify how and where state is stored. State can 
be located on Java's heap or off-heap. Depending on your state backend, Flink 
can also *manage* the state for the application, meaning Flink deals with the 
memory management (possibly spilling to disk if necessary) to allow 
applications to hold very large state. State backends can be configured without 
changing your application logic.
+
+{% top %}
+
+Where to go next?
+-----------------
+
+* [Working with State](state.html): Shows how to use state in a Flink 
application and explains the different kinds of state.
+* [Checkpointing](checkpointing.html): Describes how to enable and configure 
checkpointing for fault tolerance.
+* [Queryable State](queryable_state.html): Explains how to access state from 
outside of Flink during runtime.
+* [Custom Serialization for Managed State](custom_serialization.html): 
Discusses custom serialization logic for state and its upgrades.
+
+{% top %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/queryable_state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/queryable_state.md 
b/docs/dev/stream/state/queryable_state.md
new file mode 100644
index 0000000..bd0d7fb
--- /dev/null
+++ b/docs/dev/stream/state/queryable_state.md
@@ -0,0 +1,292 @@
+---
+title: "Queryable State"
+nav-parent_id: streaming_state
+nav-pos: 3
+is_beta: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* ToC
+{:toc}
+
+<div class="alert alert-warning">
+  <strong>Note:</strong> The client APIs for queryable state are currently in 
an evolving state and
+  there are <strong>no guarantees</strong> made about stability of the 
provided interfaces. It is
+  likely that there will be breaking API changes on the client side in the 
upcoming Flink versions.
+</div>
+
+In a nutshell, this feature allows users to query Flink's managed partitioned 
state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) 
from outside of
+Flink. For some scenarios, queryable state thus eliminates the need for 
distributed
+operations/transactions with external systems such as key-value stores which 
are often the
+bottleneck in practice.
+
+<div class="alert alert-warning">
+  <strong>Attention:</strong> Queryable state accesses keyed state from a 
concurrent thread rather
+  than synchronizing with the operator and potentially blocking its operation. 
Since any state
+  backend using Java heap space, e.g. MemoryStateBackend or
+  FsStateBackend, does not work with copies when retrieving values but instead 
directly
+  references the stored values, read-modify-write patterns are unsafe and may 
cause the
+  queryable state server to fail due to concurrent modifications.
+  The RocksDBStateBackend is safe from these issues.
+</div>
+
+## Making State Queryable
+
+In order to make state queryable, the queryable state server first needs to be 
enabled globally
+by setting the `query.server.enable` configuration parameter to `true` 
(current default).
+Then appropriate state needs to be made queryable by using either
+
+* a `QueryableStateStream`, a convenience object which behaves like a sink and 
offers incoming values as
+queryable state, or
+* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the 
keyed state of an
+operator queryable.
+
+The following sections explain the use of these two approaches.
+
+### Queryable State Stream
+
+A `KeyedStream` may offer its values as queryable state by using the following 
methods:
+
+{% highlight java %}
+// ValueState
+QueryableStateStream asQueryableState(
+    String queryableStateName,
+    ValueStateDescriptor stateDescriptor)
+
+// Shortcut for explicit ValueStateDescriptor variant
+QueryableStateStream asQueryableState(String queryableStateName)
+
+// FoldingState
+QueryableStateStream asQueryableState(
+    String queryableStateName,
+    FoldingStateDescriptor stateDescriptor)
+
+// ReducingState
+QueryableStateStream asQueryableState(
+    String queryableStateName,
+    ReducingStateDescriptor stateDescriptor)
+{% endhighlight %}
+
+
+<div class="alert alert-info">
+  <strong>Note:</strong> There is no queryable <code>ListState</code> sink as 
it would result in an ever-growing
+  list which may not be cleaned up and thus will eventually consume too much 
memory.
+</div>
+
+A call to these methods returns a `QueryableStateStream`, which cannot be 
further transformed and
+currently only holds the name as well as the value and key serializer for the 
queryable state
+stream. It is comparable to a sink, and cannot be followed by further 
transformations.
+
+Internally a `QueryableStateStream` gets translated to an operator which uses 
all incoming
+records to update the queryable state instance.
+In a program like the following, all records of the keyed stream will be used 
to update the state
+instance, either via `ValueState#update(value)` or 
`AppendingState#add(value)`, depending on
+the chosen state variant:
+{% highlight java %}
+stream.keyBy(0).asQueryableState("query-name")
+{% endhighlight %}
+This acts like the Scala API's `flatMapWithState`.
+
+### Managed Keyed State
+
+Managed keyed state of an operator
+(see [Using Managed Keyed State]({{ site.baseurl 
}}/dev/stream/state/state.html#using-managed-keyed-state))
+can be made queryable by making the appropriate state descriptor queryable via
+`StateDescriptor#setQueryable(String queryableStateName)`, as in the example 
below:
+{% highlight java %}
+ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+        new ValueStateDescriptor<>(
+                "average", // the state name
+                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 
type information
+                Tuple2.of(0L, 0L)); // default value of the state, if nothing 
was set
+descriptor.setQueryable("query-name"); // queryable state name
+{% endhighlight %}
+<div class="alert alert-info">
+  <strong>Note:</strong> The `queryableStateName` parameter may be chosen 
arbitrarily and is only
+  used for queries. It does not have to be identical to the state's own name.
+</div>
+
+
+## Querying State
+
+The `QueryableStateClient` helper class may be used for queries against the 
`KvState` instances that
+serve the state internally. It needs to be set up with a valid `JobManager` 
address and port and is
+created as follows:
+
+{% highlight java %}
+final Configuration config = new Configuration();
+config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
+config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
+
+final HighAvailabilityServices highAvailabilityServices =
+      HighAvailabilityServicesUtils.createHighAvailabilityServices(
+           config,
+           Executors.newSingleThreadScheduledExecutor(),
+           
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+
+QueryableStateClient client = new QueryableStateClient(config, 
highAvailabilityServices);
+{% endhighlight %}
+
+The query method is this:
+
+{% highlight java %}
+Future<byte[]> getKvState(
+    JobID jobID,
+    String queryableStateName,
+    int keyHashCode,
+    byte[] serializedKeyAndNamespace)
+{% endhighlight %}
+
+A call to this method returns a `Future` eventually holding the serialized 
state value for the
+queryable state instance identified by `queryableStateName` of the job with ID 
`jobID`. The
+`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` 
and the
+`serializedKeyAndNamespace` is the serialized key and namespace.
+<div class="alert alert-info">
+  <strong>Note:</strong> The client is asynchronous and can be shared by 
multiple threads. It needs
+  to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused 
in order to free
+  resources.
+</div>
+
+The current implementation is still pretty low-level in the sense that it only 
works with
+serialized data both for providing the key/namespace and the returned results. 
It is the
+responsibility of the user (or some follow-up utilities) to set up the 
serializers for this. The
+nice thing about this is that the query services don't have to get into the 
business of worrying
+about any class loading issues etc.
+
+There are some serialization utils for key/namespace and value serialization 
included in
+`KvStateRequestSerializer`.
+
+### Example
+
+The following example extends the `CountWindowAverage` example
+(see [Using Managed Keyed State]({{ site.baseurl 
}}/dev/stream/state/state.html#using-managed-keyed-state))
+by making it queryable and showing how to query this value:
+
+{% highlight java %}
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
+
+    private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+        Tuple2<Long, Long> currentSum = sum.value();
+        currentSum.f0 += 1;
+        currentSum.f1 += input.f1;
+        sum.update(currentSum);
+
+        if (currentSum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
+            sum.clear();
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}), // type information
+                        Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
+        descriptor.setQueryable("query-name");
+        sum = getRuntimeContext().getState(descriptor);
+    }
+}
+{% endhighlight %}
+
+Once used in a job, you can retrieve the job ID and then query any key's 
current state from this operator:
+
+{% highlight java %}
+final Configuration config = new Configuration();
+config.setString(JobManagerOptions.ADDRESS, queryAddress);
+config.setInteger(JobManagerOptions.PORT, queryPort);
+
+final HighAvailabilityServices highAvailabilityServices =
+      HighAvailabilityServicesUtils.createHighAvailabilityServices(
+           config,
+           Executors.newSingleThreadScheduledExecutor(),
+           
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+
+QueryableStateClient client = new QueryableStateClient(config, 
highAvailabilityServices);
+
+final TypeSerializer<Long> keySerializer =
+        TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new 
ExecutionConfig());
+final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
+        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}).createSerializer(new ExecutionConfig());
+
+final byte[] serializedKey =
+        KvStateRequestSerializer.serializeKeyAndNamespace(
+                key, keySerializer,
+                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
+
+Future<byte[]> serializedResult =
+        client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
+
+// now wait for the result and return it
+final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
+byte[] serializedValue = Await.result(serializedResult, duration);
+Tuple2<Long, Long> value =
+        KvStateRequestSerializer.deserializeValue(serializedValue, 
valueSerializer);
+{% endhighlight %}
+
+### Note for Scala Users
+
+Please use the available Scala extensions when creating the `TypeSerializer` 
instances. Add the following import:
+
+```scala
+import org.apache.flink.streaming.api.scala._
+```
+
+Now you can create the type serializers as follows:
+
+```scala
+val keySerializer = createTypeInformation[Long]
+  .createSerializer(new ExecutionConfig)
+```
+
+If you don't do this, you can run into mismatches between the serializers used 
in the Flink job and in your client code, because types like `scala.Long` 
cannot be caputured at runtime.
+
+## Configuration
+
+The following configuration parameters influence the behaviour of the 
queryable state server and client.
+They are defined in `QueryableStateOptions`.
+
+### Server
+* `query.server.enable`: flag to indicate whether to start the queryable state 
server
+* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick 
random available port)
+* `query.server.network-threads`: number of network (event loop) threads for 
the `KvStateServer` (0 => #slots)
+* `query.server.query-threads`: number of asynchronous query threads for the 
`KvStateServerHandler` (0 => #slots).
+
+### Client (`QueryableStateClient`)
+* `query.client.network-threads`: number of network (event loop) threads for 
the `KvStateClient` (0 => number of available cores)
+* `query.client.lookup.num-retries`: number of retries on location lookup 
failures
+* `query.client.lookup.retry-delay`: retry delay on location lookup failures 
(millis)
+
+## Limitations
+
+* The queryable state life-cycle is bound to the life-cycle of the job, e.g. 
tasks register
+queryable state on startup and unregister it on disposal. In future versions, 
it is desirable to
+decouple this in order to allow queries after a task finishes, and to speed up 
recovery via state
+replication.
+* Notifications about available KvState happen via a simple tell. In the 
future this should be improved to be
+more robust with asks and acknowledgements.
+* The server and client keep track of statistics for queries. These are 
currently disabled by
+default as they would not be exposed anywhere. As soon as there is better 
support to publish these
+numbers via the Metrics system, we should enable the stats.

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
new file mode 100644
index 0000000..f280ceb
--- /dev/null
+++ b/docs/dev/stream/state/state.md
@@ -0,0 +1,596 @@
+---
+title: "Working with State"
+nav-parent_id: streaming_state
+nav-pos: 1
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document explains how to use Flink's state abstractions when developing 
an application.
+
+* ToC
+{:toc}
+
+## Keyed State and Operator State
+
+There are two basic kinds of state in Flink: `Keyed State` and `Operator 
State`.
+
+### Keyed State
+
+*Keyed State* is always relative to keys and can only be used in functions and 
operators on a `KeyedStream`.
+
+You can think of Keyed State as Operator State that has been partitioned,
+or sharded, with exactly one state-partition per key.
+Each keyed-state is logically bound to a unique
+composite of <parallel-operator-instance, key>, and since each key
+"belongs" to exactly one parallel instance of a keyed operator, we can
+think of this simply as <operator, key>.
+
+Keyed State is further organized into so-called *Key Groups*. Key Groups are 
the
+atomic unit by which Flink can redistribute Keyed State;
+there are exactly as many Key Groups as the defined maximum parallelism.
+During execution each parallel instance of a keyed operator works with the keys
+for one or more Key Groups.
+
+### Operator State
+
+With *Operator State* (or *non-keyed state*), each operator state is
+bound to one parallel operator instance.
+The [Kafka Connector]({{ site.baseurl }}/dev/connectors/kafka.html) is a good 
motivating example for the use of Operator State
+in Flink. Each parallel instance of the Kafka consumer maintains a map
+of topic partitions and offsets as its Operator State.
+
+The Operator State interfaces support redistributing state among
+parallel operator instances when the parallelism is changed. There can be 
different schemes for doing this redistribution.
+
+## Raw and Managed State
+
+*Keyed State* and *Operator State* exist in two forms: *managed* and *raw*.
+
+*Managed State* is represented in data structures controlled by the Flink 
runtime, such as internal hash tables, or RocksDB.
+Examples are "ValueState", "ListState", etc. Flink's runtime encodes
+the states and writes them into the checkpoints.
+
+*Raw State* is state that operators keep in their own data structures. When 
checkpointed, they only write a sequence of bytes into
+the checkpoint. Flink knows nothing about the state's data structures and sees 
only the raw bytes.
+
+All datastream functions can use managed state, but the raw state interfaces 
can only be used when implementing operators.
+Using managed state (rather than raw state) is recommended, since with
+managed state Flink is able to automatically redistribute state when the 
parallelism is
+changed, and also do better memory management.
+
+<span class="label label-danger">Attention</span> If your managed state needs 
custom serialization logic, please see 
+the [corresponding guide](custom_serialization.html) in order to ensure future 
compatibility. Flink's default serializers 
+don't need special treatment.
+
+## Using Managed Keyed State
+
+The managed keyed state interface provides access to different types of state 
that are all scoped to
+the key of the current input element. This means that this type of state can 
only be used
+on a `KeyedStream`, which can be created via `stream.keyBy(…)`.
+
+Now, we will first look at the different types of state available and then we 
will see
+how they can be used in a program. The available state primitives are:
+
+* `ValueState<T>`: This keeps a value that can be updated and
+retrieved (scoped to key of the input element as mentioned above, so there 
will possibly be one value
+for each key that the operation sees). The value can be set using `update(T)` 
and retrieved using
+`T value()`.
+
+* `ListState<T>`: This keeps a list of elements. You can append elements and 
retrieve an `Iterable`
+over all currently stored elements. Elements are added using `add(T)`, the 
Iterable can
+be retrieved using `Iterable<T> get()`.
+
+* `ReducingState<T>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. The interface is the same as for `ListState` but elements 
added using
+`add(T)` are reduced to an aggregate using a specified `ReduceFunction`.
+
+* `FoldingState<T, ACC>`: This keeps a single value that represents the 
aggregation of all values
+added to the state. Contrary to `ReducingState`, the aggregate type may be 
different from the type
+of elements that are added to the state. The interface is the same as for 
`ListState` but elements
+added using `add(T)` are folded into an aggregate using a specified 
`FoldFunction`.
+
+* `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value 
pairs into the state and
+retrieve an `Iterable` over all currently stored mappings. Mappings are added 
using `put(UK, UV)` or
+`putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved 
using `get(UK)`. The iterable
+views for mappings, keys and values can be retrieved using `entries()`, 
`keys()` and `values()` respectively.
+
+All types of state also have a method `clear()` that clears the state for the 
currently
+active key, i.e. the key of the input element.
+
+<span class="label label-danger">Attention</span> `FoldingState` will be 
deprecated in one of
+the next versions of Flink and will be completely removed in the future. A 
more general
+alternative will be provided.
+
+It is important to keep in mind that these state objects are only used for 
interfacing
+with state. The state is not necessarily stored inside but might reside on 
disk or somewhere else.
+The second thing to keep in mind is that the value you get from the state
+depends on the key of the input element. So the value you get in one 
invocation of your
+user function can differ from the value in another invocation if the keys 
involved are different.
+
+To get a state handle, you have to create a `StateDescriptor`. This holds the 
name of the state
+(as we will see later, you can create several states, and they have to have 
unique names so
+that you can reference them), the type of the values that the state holds, and 
possibly
+a user-specified function, such as a `ReduceFunction`. Depending on what type 
of state you
+want to retrieve, you create either a `ValueStateDescriptor`, a 
`ListStateDescriptor`,
+a `ReducingStateDescriptor`, a `FoldingStateDescriptor` or a 
`MapStateDescriptor`.
+
+State is accessed using the `RuntimeContext`, so it is only possible in *rich 
functions*.
+Please see [here]({{ site.baseurl }}/dev/api_concepts.html#rich-functions) for
+information about that, but we will also see an example shortly. The 
`RuntimeContext` that
+is available in a `RichFunction` has these methods for accessing state:
+
+* `ValueState<T> getState(ValueStateDescriptor<T>)`
+* `ReducingState<T> getReducingState(ReducingStateDescriptor<T>)`
+* `ListState<T> getListState(ListStateDescriptor<T>)`
+* `FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)`
+* `MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)`
+
+This is an example `FlatMapFunction` that shows how all of the parts fit 
together:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, 
Long>, Tuple2<Long, Long>> {
+
+    /**
+     * The ValueState handle. The first field is the count, the second field a 
running sum.
+     */
+    private transient ValueState<Tuple2<Long, Long>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, 
Long>> out) throws Exception {
+
+        // access the state value
+        Tuple2<Long, Long> currentSum = sum.value();
+
+        // update the count
+        currentSum.f0 += 1;
+
+        // add the second field of the input value
+        currentSum.f1 += input.f1;
+
+        // update the state
+        sum.update(currentSum);
+
+        // if the count reaches 2, emit the average and clear the state
+        if (currentSum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
+            sum.clear();
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() 
{}), // type information
+                        Tuple2.of(0L, 0L)); // default value of the state, if 
nothing was set
+        sum = getRuntimeContext().getState(descriptor);
+    }
+}
+
+// this can be used in a streaming program like this (assuming we have a 
StreamExecutionEnvironment env)
+env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), 
Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
+        .keyBy(0)
+        .flatMap(new CountWindowAverage())
+        .print();
+
+// the printed output will be (1,4) and (1,5)
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, 
Long)] {
+
+  private var sum: ValueState[(Long, Long)] = _
+
+  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): 
Unit = {
+
+    // access the state value
+    val tmpCurrentSum = sum.value
+
+    // If it hasn't been used before, it will be null
+    val currentSum = if (tmpCurrentSum != null) {
+      tmpCurrentSum
+    } else {
+      (0L, 0L)
+    }
+
+    // update the count
+    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
+
+    // update the state
+    sum.update(newSum)
+
+    // if the count reaches 2, emit the average and clear the state
+    if (newSum._1 >= 2) {
+      out.collect((input._1, newSum._2 / newSum._1))
+      sum.clear()
+    }
+  }
+
+  override def open(parameters: Configuration): Unit = {
+    sum = getRuntimeContext.getState(
+      new ValueStateDescriptor[(Long, Long)]("average", 
createTypeInformation[(Long, Long)])
+    )
+  }
+}
+
+
+object ExampleCountWindowAverage extends App {
+  val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+  env.fromCollection(List(
+    (1L, 3L),
+    (1L, 5L),
+    (1L, 7L),
+    (1L, 4L),
+    (1L, 2L)
+  )).keyBy(_._1)
+    .flatMap(new CountWindowAverage())
+    .print()
+  // the printed output will be (1,4) and (1,5)
+
+  env.execute("ExampleManagedState")
+}
+{% endhighlight %}
+</div>
+</div>
+
+This example implements a poor man's counting window. We key the tuples by the 
first field
+(in the example all have the same key `1`). The function stores the count and 
a running sum in
+a `ValueState`. Once the count reaches 2 it will emit the average and clear 
the state so that
+we start over from `0`. Note that this would keep a different state value for 
each different input
+key if we had tuples with different values in the first field.
+
+### State in the Scala DataStream API
+
+In addition to the interface described above, the Scala API has shortcuts for 
stateful
+`map()` or `flatMap()` functions with a single `ValueState` on `KeyedStream`. 
The user function
+gets the current value of the `ValueState` in an `Option` and must return an 
updated value that
+will be used to update the state.
+
+{% highlight scala %}
+val stream: DataStream[(String, Int)] = ...
+
+val counts: DataStream[(String, Int)] = stream
+  .keyBy(_._1)
+  .mapWithState((in: (String, Int), count: Option[Int]) =>
+    count match {
+      case Some(c) => ( (in._1, c), Some(c + in._2) )
+      case None => ( (in._1, 0), Some(in._2) )
+    })
+{% endhighlight %}
+
+## Using Managed Operator State
+
+To use managed operator state, a stateful function can implement either the 
more general `CheckpointedFunction`
+interface, or the `ListCheckpointed<T extends Serializable>` interface.
+
+#### CheckpointedFunction
+
+The `CheckpointedFunction` interface provides access to non-keyed state with 
different
+redistribution schemes. It requires the implementation of two methods:
+
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
+
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
+
+Whenever a checkpoint has to be performed, `snapshotState()` is called. The 
counterpart, `initializeState()`,
+is called every time the user-defined function is initialized, be that when 
the function is first initialized
+or be that when the function is actually recovering from an earlier 
checkpoint. Given this, `initializeState()` is not
+only the place where different types of state are initialized, but also where 
state recovery logic is included.
+
+Currently, list-style managed operator state is supported. The state
+is expected to be a `List` of *serializable* objects, independent from each 
other,
+thus eligible for redistribution upon rescaling. In other words, these objects 
are the finest granularity at which
+non-keyed state can be redistributed. Depending on the state accessing method,
+the following redistribution schemes are defined:
+
+  - **Even-split redistribution:** Each operator returns a List of state 
elements. The whole state is logically a concatenation of
+    all lists. On restore/redistribution, the list is evenly divided into as 
many sublists as there are parallel operators.
+    Each operator gets a sublist, which can be empty, or contain one or more 
elements.
+    As an example, if with parallelism 1 the checkpointed state of an operator
+    contains elements `element1` and `element2`, when increasing the 
parallelism to 2, `element1` may end up in operator instance 0,
+    while `element2` will go to operator instance 1.
+
+  - **Union redistribution:** Each operator returns a List of state elements. 
The whole state is logically a concatenation of
+    all lists. On restore/redistribution, each operator gets the complete list 
of state elements.
+
+Below is an example of a stateful `SinkFunction` that uses 
`CheckpointedFunction`
+to buffer elements before sending them to the outside world. It demonstrates
+the basic even-split redistribution list state:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public class BufferingSink
+        implements SinkFunction<Tuple2<String, Integer>>,
+                   CheckpointedFunction,
+                   CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+
+    private final int threshold;
+
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
+
+    private List<Tuple2<String, Integer>> bufferedElements;
+
+    public BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
+
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
+            }
+            bufferedElements.clear();
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        checkpointedState.clear();
+        for (Tuple2<String, Integer> element : bufferedElements) {
+            checkpointedState.add(element);
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+            new ListStateDescriptor<>(
+                "buffered-elements",
+                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
+
+        checkpointedState = 
context.getOperatorStateStore().getListState(descriptor);
+
+        if (context.isRestored()) {
+            for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                bufferedElements.add(element);
+            }
+        }
+    }
+
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws 
Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class BufferingSink(threshold: Int = 0)
+  extends SinkFunction[(String, Int)]
+    with CheckpointedFunction
+    with CheckpointedRestoring[List[(String, Int)]] {
+
+  @transient
+  private var checkpointedState: ListState[(String, Int)] = null
+
+  private val bufferedElements = ListBuffer[(String, Int)]()
+
+  override def invoke(value: (String, Int)): Unit = {
+    bufferedElements += value
+    if (bufferedElements.size == threshold) {
+      for (element <- bufferedElements) {
+        // send it to the sink
+      }
+      bufferedElements.clear()
+    }
+  }
+
+  override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    checkpointedState.clear()
+    for (element <- bufferedElements) {
+      checkpointedState.add(element)
+    }
+  }
+
+  override def initializeState(context: FunctionInitializationContext): Unit = 
{
+    val descriptor = new ListStateDescriptor[(String, Int)](
+      "buffered-elements",
+      TypeInformation.of(new TypeHint[(String, Int)]() {})
+    )
+
+    checkpointedState = context.getOperatorStateStore.getListState(descriptor)
+
+    if(context.isRestored) {
+      for(element <- checkpointedState.get()) {
+        bufferedElements += element
+      }
+    }
+  }
+
+  override def restoreState(state: List[(String, Int)]): Unit = {
+    bufferedElements ++= state
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+The `initializeState` method takes as argument a 
`FunctionInitializationContext`. This is used to initialize
+the non-keyed state "containers". These are a container of type `ListState` 
where the non-keyed state objects
+are going to be stored upon checkpointing.
+
+Note how the state is initialized, similar to keyed state,
+with a `StateDescriptor` that contains the state name and information
+about the type of the value that the state holds:
+
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ListStateDescriptor<Tuple2<String, Integer>> descriptor =
+    new ListStateDescriptor<>(
+        "buffered-elements",
+        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
+
+checkpointedState = context.getOperatorStateStore().getListState(descriptor);
+{% endhighlight %}
+
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val descriptor = new ListStateDescriptor[(String, Long)](
+    "buffered-elements",
+    TypeInformation.of(new TypeHint[(String, Long)]() {})
+)
+
+checkpointedState = context.getOperatorStateStore.getListState(descriptor)
+
+{% endhighlight %}
+</div>
+</div>
+The naming convention of the state access methods contain its redistribution
+pattern followed by its state structure. For example, to use list state with 
the
+union redistribution scheme on restore, access the state by using 
`getUnionListState(descriptor)`.
+If the method name does not contain the redistribution pattern, *e.g.* 
`getListState(descriptor)`,
+it simply implies that the basic even-split redistribution scheme will be used.
+
+After initializing the container, we use the `isRestored()` method of the 
context to check if we are
+recovering after a failure. If this is `true`, *i.e.* we are recovering, the 
restore logic is applied.
+
+As shown in the code of the modified `BufferingSink`, this `ListState` 
recovered during state
+initialization is kept in a class variable for future use in 
`snapshotState()`. There the `ListState` is cleared
+of all objects included by the previous checkpoint, and is then filled with 
the new ones we want to checkpoint.
+
+As a side note, the keyed state can also be initialized in the 
`initializeState()` method. This can be done
+using the provided `FunctionInitializationContext`.
+
+#### ListCheckpointed
+
+The `ListCheckpointed` interface is a more limited variant of 
`CheckpointedFunction`,
+which only supports list-style state with even-split redistribution scheme on 
restore.
+It also requires the implementation of two methods:
+
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
+
+On `snapshotState()` the operator should return a list of objects to 
checkpoint and
+`restoreState` has to handle such a list upon recovery. If the state is not 
re-partitionable, you can always
+return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
+
+### Stateful Source Functions
+
+Stateful sources require a bit more care as opposed to other operators.
+In order to make the updates to the state and output collection atomic 
(required for exactly-once semantics
+on failure/recovery), the user is required to get a lock from the source's 
context.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public static class CounterSource
+        extends RichParallelSourceFunction<Long>
+        implements ListCheckpointed<Long> {
+
+    /**  current offset for exactly once semantics */
+    private Long offset;
+
+    /** flag for job cancellation */
+    private volatile boolean isRunning = true;
+
+    @Override
+    public void run(SourceContext<Long> ctx) {
+        final Object lock = ctx.getCheckpointLock();
+
+        while (isRunning) {
+            // output and state update are atomic
+            synchronized (lock) {
+                ctx.collect(offset);
+                offset += 1;
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+
+    @Override
+    public List<Long> snapshotState(long checkpointId, long 
checkpointTimestamp) {
+        return Collections.singletonList(offset);
+    }
+
+    @Override
+    public void restoreState(List<Long> state) {
+        for (Long s : state)
+            offset = s;
+    }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CounterSource
+       extends RichParallelSourceFunction[Long]
+       with ListCheckpointed[Long] {
+
+  @volatile
+  private var isRunning = true
+
+  private var offset = 0L
+
+  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
+    val lock = ctx.getCheckpointLock
+
+    while (isRunning) {
+      // output and state update are atomic
+      lock.synchronized({
+        ctx.collect(offset)
+
+        offset += 1
+      })
+    }
+  }
+
+  override def cancel(): Unit = isRunning = false
+
+  override def restoreState(state: util.List[Long]): Unit =
+    for (s <- state) {
+      offset = s
+    }
+
+  override def snapshotState(checkpointId: Long, timestamp: Long): 
util.List[Long] =
+    Collections.singletonList(offset)
+
+}
+{% endhighlight %}
+</div>
+</div>
+
+Some operators might need the information when a checkpoint is fully 
acknowledged by Flink to communicate that with the outside world. In this case 
see the `org.apache.flink.runtime.state.CheckpointListener` interface.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/stream/state/state_backends.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/state_backends.md 
b/docs/dev/stream/state/state_backends.md
new file mode 100644
index 0000000..1357f2e
--- /dev/null
+++ b/docs/dev/stream/state/state_backends.md
@@ -0,0 +1,46 @@
+---
+title: "State Backends"
+nav-parent_id: streaming_state
+nav-pos: 5
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+Flink provides different state backends that specify how and where state is 
stored.
+
+State can be located on Java’s heap or off-heap. Depending on your state 
backend, Flink can also manage the state for the application, meaning Flink 
deals with the memory management (possibly spilling to disk if necessary) to 
allow applications to hold very large state. By default, the configuration file 
*flink-conf.yaml* determines the state backend for all Flink jobs.
+
+However, the default state backend can be overridden on a per-job basis, as 
shown below.
+
+For more information about the available state backends, their advantages, 
limitations, and configuration parameters see the corresponding section in 
[Deployment & Operations]({{ site.baseurl }}/ops/state/state_backends.html).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+env.setStateBackend(...);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.setStateBackend(...)
+{% endhighlight %}
+</div>
+</div>

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/dev/table/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/index.md b/docs/dev/table/index.md
index df2ccba..5845c95 100644
--- a/docs/dev/table/index.md
+++ b/docs/dev/table/index.md
@@ -74,7 +74,7 @@ Where to go next?
 
 * [Concepts & Common API]({{ site.baseurl }}/dev/table/common.html): Shared 
concepts and APIs of the Table API and SQL.
 * [Streaming Table API & SQL]({{ site.baseurl }}/dev/table/streaming.html): 
Streaming-specific documentation for the Table API or SQL such as configuration 
of time attributes and handling of updating results.
-* [Table API]({{ site.baseurl }}/dev/table/tableapi.html): Supported 
operations and API for the Table API.
+* [Table API]({{ site.baseurl }}/dev/table/tableApi.html): Supported 
operations and API for the Table API.
 * [SQL]({{ site.baseurl }}/dev/table/sql.html): Supported operations and 
syntax for SQL
 * [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html): 
Reading tables from and emitting tables to external storage systems.
 * [User-Defined Functions]({{ site.baseurl }}/dev/table/udfs.html): Definition 
and usage of user-defined functions.

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/internals/components.md
----------------------------------------------------------------------
diff --git a/docs/internals/components.md b/docs/internals/components.md
index b750ffd..cf3b659 100644
--- a/docs/internals/components.md
+++ b/docs/internals/components.md
@@ -54,6 +54,6 @@ You can click on the components in the figure to learn more.
 <area id="dataset" title="DataSet API" href="{{ site.baseurl 
}}/dev/batch/index.html" shape="rect" coords="382,177,697,255" />
 <area id="runtime" title="Runtime" href="{{ site.baseurl 
}}/concepts/runtime.html" shape="rect" coords="63,257,700,335" />
 <area id="local" title="Local" href="{{ site.baseurl 
}}/quickstart/setup_quickstart.html" shape="rect" coords="62,337,275,414" />
-<area id="cluster" title="Cluster" href="{{ site.baseurl 
}}/setup/cluster_setup.html" shape="rect" coords="273,336,486,413" />
-<area id="cloud" title="Cloud" href="{{ site.baseurl }}/setup/gce_setup.html" 
shape="rect" coords="485,336,700,414" />
+<area id="cluster" title="Cluster" href="{{ site.baseurl 
}}/ops/deployment/cluster_setup.html" shape="rect" coords="273,336,486,413" />
+<area id="cloud" title="Cloud" href="{{ site.baseurl 
}}/ops/deployment/gce_setup.html" shape="rect" coords="485,336,700,414" />
 </map>

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/internals/stream_checkpointing.md
----------------------------------------------------------------------
diff --git a/docs/internals/stream_checkpointing.md 
b/docs/internals/stream_checkpointing.md
index d701c5e..330b0aa 100644
--- a/docs/internals/stream_checkpointing.md
+++ b/docs/internals/stream_checkpointing.md
@@ -45,7 +45,7 @@ The system then restarts the operators and resets them to the 
latest successful
 point of the state snapshot. Any records that are processed as part of the 
restarted parallel dataflow are guaranteed to not
 have been part of the previously checkpointed state.
 
-*Note:* By default, checkpointing is disabled. See [Checkpointing]({{ 
site.baseurl }}/dev/stream/checkpointing.html) for details on how to enable and 
configure checkpointing.
+*Note:* By default, checkpointing is disabled. See [Checkpointing]({{ 
site.baseurl }}/dev/stream/state/checkpointing.html) for details on how to 
enable and configure checkpointing.
 
 *Note:* For this mechanism to realize its full guarantees, the data stream 
source (such as message queue or broker) needs to be able
 to rewind the stream to a defined recent point. [Apache 
Kafka](http://kafka.apache.org) has this ability and Flink's connector to
@@ -106,10 +106,10 @@ the barrier *n* from the other inputs as well. Otherwise, 
it would mix records t
 
 When operators contain any form of *state*, this state must be part of the 
snapshots as well. Operator state comes in different forms:
 
-  - *User-defined state*: This is state that is created and modified directly 
by the transformation functions (like `map()` or `filter()`). See [State in 
Streaming Applications]({{ site.baseurl }}/dev/stream/state.html) for details.
+  - *User-defined state*: This is state that is created and modified directly 
by the transformation functions (like `map()` or `filter()`). See [State in 
Streaming Applications]({{ site.baseurl }}/dev/stream/state/index.html) for 
details.
   - *System state*: This state refers to data buffers that are part of the 
operator's computation. A typical example for this state are the *window 
buffers*, inside which the system collects (and aggregates) records for windows 
until the window is evaluated and evicted.
 
-Operators snapshot their state at the point in time when they have received 
all snapshot barriers from their input streams, and before emitting the 
barriers to their output streams. At that point, all updates to the state from 
records before the barriers will have been made, and no updates that depend on 
records from after the barriers have been applied. Because the state of a 
snapshot may be large, it is stored in a configurable *[state backend]({{ 
site.baseurl }}/ops/state_backends.html)*. By default, this is the JobManager's 
memory, but for production use a distributed reliable storage should be 
configured (such as HDFS). After the state has been stored, the operator 
acknowledges the checkpoint, emits the snapshot barrier into the output 
streams, and proceeds.
+Operators snapshot their state at the point in time when they have received 
all snapshot barriers from their input streams, and before emitting the 
barriers to their output streams. At that point, all updates to the state from 
records before the barriers will have been made, and no updates that depend on 
records from after the barriers have been applied. Because the state of a 
snapshot may be large, it is stored in a configurable *[state backend]({{ 
site.baseurl }}/ops/state/state_backends.html)*. By default, this is the 
JobManager's memory, but for production use a distributed reliable storage 
should be configured (such as HDFS). After the state has been stored, the 
operator acknowledges the checkpoint, emits the snapshot barrier into the 
output streams, and proceeds.
 
 The resulting snapshot now contains:
 
@@ -145,7 +145,7 @@ It is possible to let an operator continue processing while 
it stores its state
 
 After receiving the checkpoint barriers on its inputs, the operator starts the 
asynchronous snapshot copying of its state. It immediately emits the barrier to 
its outputs and continues with the regular stream processing. Once the 
background copy process has completed, it acknowledges the checkpoint to the 
checkpoint coordinator (the JobManager). The checkpoint is now only complete 
after all sinks have received the barriers and all stateful operators have 
acknowledged their completed backup (which may be after the barriers reach the 
sinks).
 
-See [State Backends]({{ site.baseurl }}/ops/state_backends.html) for details 
on the state snapshots.
+See [State Backends]({{ site.baseurl }}/ops/state/state_backends.html) for 
details on the state snapshots.
 
 
 ## Recovery

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/internals/task_lifecycle.md
----------------------------------------------------------------------
diff --git a/docs/internals/task_lifecycle.md b/docs/internals/task_lifecycle.md
index cc557a1..fed2cb9 100644
--- a/docs/internals/task_lifecycle.md
+++ b/docs/internals/task_lifecycle.md
@@ -89,7 +89,7 @@ and skips any intermediate phases between the phase the 
operator was in when the
 **Checkpoints:** The `snapshotState()` method of the operator is called 
asynchronously to the rest of the methods described 
 above whenever a checkpoint barrier is received. Checkpoints are performed 
during the processing phase, *i.e.* after the 
 operator is opened and before it is closed. The responsibility of this method 
is to store the current state of the operator 
-to the specified [state backend]({{ site.baseurl }}/ops/state_backends.html) 
from where it will be retrieved when 
+to the specified [state backend]({{ site.baseurl 
}}/ops/state/state_backends.html) from where it will be retrieved when 
 the job resumes execution after a failure. Below we include a brief 
description of Flink's checkpointing mechanism, 
 and for a more detailed discussion on the principles around checkpointing in 
Flink please read the corresponding documentation: 
 [Data Streaming Fault Tolerance]({{ site.baseurl 
}}/internals/stream_checkpointing.html).
@@ -125,7 +125,7 @@ first step for the task is to retrieve its initial, 
task-wide state. This is don
 particularly important in two cases:
 
 1. when the task is recovering from a failure and restarts from the last 
successful checkpoint
-2. when resuming from a [savepoint]({{ site.baseurl }}/setup/savepoints.html). 
+2. when resuming from a [savepoint]({{ site.baseurl 
}}/ops/state/savepoints.html). 
 
 If it is the first time the task is executed, the initial task state is empty. 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/monitoring/debugging_classloading.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/debugging_classloading.md 
b/docs/monitoring/debugging_classloading.md
index 8c91e0f..d69870a 100644
--- a/docs/monitoring/debugging_classloading.md
+++ b/docs/monitoring/debugging_classloading.md
@@ -57,7 +57,7 @@ YARN classloading differs between single job deploymens and 
sessions:
 
 **Mesos**
 
-Mesos setups following [this documentation](../setup/mesos.html) currently 
behave very much like the a 
+Mesos setups following [this documentation](../ops/deployment/mesos.html) 
currently behave very much like the a 
 YARN session: The TaskManager and JobManager processes are started with the 
Flink framework classes in classpath, job
 classes are loaded dynamically when the jobs are submitted.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/47070674/docs/monitoring/historyserver.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/historyserver.md b/docs/monitoring/historyserver.md
index 773f3be..e109512 100644
--- a/docs/monitoring/historyserver.md
+++ b/docs/monitoring/historyserver.md
@@ -71,7 +71,7 @@ historyserver.archive.fs.refresh-interval: 10000
 
 The contained archives are downloaded and cached in the local filesystem. The 
local directory for this is configured via `historyserver.web.tmpdir`.
 
-Check out the configuration page for a [complete list of configuration 
options]({{ site.baseurl }}/setup/config.html#history-server).
+Check out the configuration page for a [complete list of configuration 
options]({{ site.baseurl }}/ops/config.html#history-server).
 
 ## Available Requests
 

Reply via email to