[hotfix] [streaming api] Improve JavaDocs of the user-facing checkpointing and 
state interfaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dfb3e0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dfb3e0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dfb3e0e

Branch: refs/heads/release-1.2
Commit: 1dfb3e0e2a3b933c1a2378ccd4e462502b0da276
Parents: 083152c
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 19:07:54 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 19:44:41 2017 +0100

----------------------------------------------------------------------
 .../runtime/state/ManagedSnapshotContext.java   |  11 +-
 .../streaming/api/checkpoint/Checkpointed.java  |  35 ++++-
 .../checkpoint/CheckpointedAsynchronously.java  |  48 ++++---
 .../api/checkpoint/CheckpointedFunction.java    | 133 +++++++++++++++++--
 .../api/checkpoint/ListCheckpointed.java        | 125 +++++++++++++++--
 5 files changed, 297 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
index 14156a6..de65c5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java
@@ -29,13 +29,18 @@ import org.apache.flink.annotation.PublicEvolving;
 public interface ManagedSnapshotContext {
 
        /**
-        * Returns the Id of the checkpoint for which the snapshot is taken.
+        * Returns the ID of the checkpoint for which the snapshot is taken.
+        * 
+        * <p>The checkpoint ID is guaranteed to be strictly monotonously 
increasing across checkpoints.
+        * For two completed checkpoints <i>A</i> and <i>B</i>, {@code ID_B > 
ID_A} means that checkpoint
+        * <i>B</i> subsumes checkpoint <i>A</i>, i.e., checkpoint <i>B</i>it 
contains a later state
+        * than checkpoint <i>A</i>.
         */
        long getCheckpointId();
 
        /**
-        * Returns the timestamp of the checkpoint for which the snapshot is 
taken.
+        * Returns timestamp (wall clock time) when the master node triggered 
the checkpoint for which
+        * the state snapshot is taken.
         */
        long getCheckpointTimestamp();
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index fb67ea7..dd93462 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -27,13 +27,38 @@ import java.io.Serializable;
  * checkpointed. The functions get a call whenever a checkpoint should take 
place
  * and return a snapshot of their state, which will be checkpointed.
  * 
- * <p>This interface marks a function as <i>synchronously</i> checkpointed. 
While the
- * state is written, the function is not called, so the function needs not 
return a
- * copy of its state, but may return a reference to its state. Functions that 
can
- * continue to work and mutate the state, even while the state snapshot is 
being accessed,
- * can implement the {@link CheckpointedAsynchronously} interface.</p>
+ * <h1>Deprecation and Replacement</h1>
+ *
+ * The short cut replacement for this interface is via {@link 
ListCheckpointed} and works
+ * as shown in the example below. The {@code ListCheckpointed} interface 
returns a list of
+ * elements (
+ * 
+ * 
+ *
+ * <pre>{@code
+ * public class ExampleFunction<T> implements MapFunction<T, T>, 
ListCheckpointed<Integer> {
+ *
+ *     private int count;
+ *
+ *     public List<Integer> snapshotState(long checkpointId, long timestamp) 
throws Exception {
+ *         return Collections.singletonList(this.count);
+ *     }
+ *
+ *     public void restoreState(List<Integer> state) throws Exception {
+ *         this.value = state.count.isEmpty() ? 0 : state.get(0);
+ *     }
+ *
+ *     public T map(T value) {
+ *         count++;
+ *         return value;
+ *     }
+ * }
+ * }</pre>
  * 
  * @param <T> The type of the operator state.
+ * 
+ * @deprecated Please use {@link ListCheckpointed} as illustrated above, or
+ *             {@link CheckpointedFunction} for more control over the 
checkpointing process.
  */
 @Deprecated
 @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
index 6fcc1d5..4bafd90 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java
@@ -18,38 +18,44 @@
 
 package org.apache.flink.streaming.api.checkpoint;
 
-
 import org.apache.flink.annotation.PublicEvolving;
 
 import java.io.Serializable;
 
 /**
- * This interface marks a function/operator as <i>asynchronously 
checkpointed</i>.
- * Similar to the {@link Checkpointed} interface, the function must produce a
- * snapshot of its state. However, the function must be able to continue 
working
- * and mutating its state without mutating the returned state snapshot.
+ * This interface marks a function/operator as checkpointed similar to the
+ * {@link Checkpointed} interface, but gives the Flink framework the option to
+ * perform the checkpoint asynchronously. Note that asynchronous checkpointing 
for 
+ * this interface has not been implemented.
  * 
- * <p>Asynchronous checkpoints are desirable, because they allow the data 
streams at the
- * point of the checkpointed function/operator to continue running while the 
checkpoint
- * is in progress.</p>
+ * <h1>Deprecation and Replacement</h1>
+ * 
+ * The shortcut replacement for this interface is via {@link ListCheckpointed} 
and works
+ * as shown in the example below. Please refer to the JavaDocs of {@link 
ListCheckpointed} for
+ * a more detailed description of how to use the new interface.
  * 
- * <p>To be able to support asynchronous snapshots, the state returned by the
- * {@link #snapshotState(long, long)} method is typically a copy or shadow copy
- * of the actual state.</p>
- * @deprecated Please use {@link ListCheckpointed} and {@link 
CheckpointedFunction}.
- *
- * The short cut replacement via {@link ListCheckpointed}
  * <pre>{@code
- *  public class ExampleOperator implements ListCheckpointed<Integer> {
+ * public class ExampleFunction<T> implements MapFunction<T, T>, 
ListCheckpointed<Integer> {
+ * 
+ *     private int count;
  *
- *             public List<Integer> snapshotState(long checkpointId, long 
timestamp) throws Exception {
- *                      return Collections.singletonList(this.value);
- *             }
+ *     public List<Integer> snapshotState(long checkpointId, long timestamp) 
throws Exception {
+ *         return Collections.singletonList(this.count);
+ *     }
  *
- *             public void restoreState(List<Integer> state) throws Exception {
- *                     this.value = state.get(0);
- *             }
+ *     public void restoreState(List<Integer> state) throws Exception {
+ *         this.value = state.count.isEmpty() ? 0 : state.get(0);
+ *     }
+ * 
+ *     public T map(T value) {
+ *         count++;
+ *         return value;
+ *     }
+ * }
  * }</pre>
+ *
+ * @deprecated Please use {@link ListCheckpointed} and {@link 
CheckpointedFunction} instead,
+ *             as illustrated in the example above.
  */
 @Deprecated
 @PublicEvolving

http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
index 37d8244..51ac5db 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java
@@ -19,29 +19,135 @@
 package org.apache.flink.streaming.api.checkpoint;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 
 /**
+ * This is the core interface for <i>stateful transformation functions</i>, 
meaning functions
+ * that maintain state across individual stream records.
+ * While more lightweight interfaces exist as shortcuts for various types of 
state, this interface offer the
+ * greatest flexibility in managing both <i>keyed state</i> and <i>operator 
state</i>.
+ * 
+ * <p>The section <a href="#shortcuts">Shortcuts</a> illustrates the common 
lightweight
+ * ways to setup stateful functions typically used instead of the full fledged
+ * abstraction represented by this interface.
+ * 
+ * <h1>Initialization</h1>
+ * 
+ * The {@link 
CheckpointedFunction#initializeState(FunctionInitializationContext)} is called 
when
+ * the parallel instance of the transformation function is created during 
distributed execution.
+ * The method gives access to the {@link FunctionInitializationContext} which 
in turn gives access
+ * to the to the {@link OperatorStateStore} and {@link KeyedStateStore}.
+ * 
+ * <p>The {@code OperatorStateStore} and {@code KeyedStateStore} give access 
to the data structures
+ * in which state should be stored for Flink to transparently manage and 
checkpoint it, such as
+ * {@link org.apache.flink.api.common.state.ValueState} or {@link 
org.apache.flink.api.common.state.ListState}.
+
+ * <p><i>Note:</i> The {@code KeyedStateStore} can only be used when the 
transformation supports
+ * <i>keyed state</i>, i.e., when it is applied on a keyed stream (after a 
{@code keyBy(...)}).
+ * 
+ * <h1>Snapshot</h1>
+ * 
+ * The {@link CheckpointedFunction#snapshotState(FunctionSnapshotContext)} is 
called whenever a
+ * checkpoint takes a state snapshot of the transformation function. Inside 
this method, functions typically
+ * make sure that the checkpointed data structures (obtained in the 
initialization phase) are up
+ * to date for a snapshot to be taken. The given snapshot context gives access 
to the metadata
+ * of the checkpoint.
+ * 
+ * <p>In addition, functions can use this method as a hook to 
flush/commit/synchronize with
+ * external systems.
+ *
+ * <h1>Example</h1>
+ * 
+ * The code example below illustrates how to use this interface for a function 
that keeps counts
+ * of events per key and per parallel partition (parallel instance of the 
transformation function
+ * during distributed execution).
+ * The example also changes of parallelism, which affect the 
count-per-parallel-partition by
+ * adding up the counters of partitions that get merged on scale-down. Note 
that this is a
+ * toy example, but should illustrate the basic skeleton for a stateful 
function.
+ * 
+ * <pre>{@code
+ * public class MyFunction<T> implements MapFunction<T, T>, 
CheckpointedFunction {
+ *
+ *     private ReducingState<Long> countPerKey;
+ *     private ListState<Long> countPerPartition;
+ *
+ *     private long localCount;
+ *
+ *     public void initializeState(FunctionInitializationContext context) 
throws Exception {
+ *         // get the state data structure for the per-key state
+ *         countPerKey = context.getKeyedStateStore().getReducingState(
+ *                 new ReducingStateDescriptor<>("perKeyCount", new 
AddFunction<>(), Long.class));
+ *
+ *         // get the state data structure for the per-key state
+ *         countPerPartition = 
context.getOperatorStateStore().getOperatorState(
+ *                 new ListStateDescriptor<>("perPartitionCount", Long.class));
  *
- * Similar to @{@link Checkpointed}, this interface must be implemented by 
functions that have potentially
- * repartitionable state that needs to be checkpointed. Methods from this 
interface are called upon checkpointing and
- * initialization of state.
+ *         // initialize the "local count variable" based on the operator state
+ *         for (Long l : countPerPartition.get()) {
+ *             localCount += l;
+ *         }
+ *     }
  *
- * On {@link #initializeState(FunctionInitializationContext)} the implementing 
class receives a
- * {@link FunctionInitializationContext} which provides access to the {@link 
OperatorStateStore} (all) and
- * {@link org.apache.flink.api.common.state.KeyedStateStore} (only for keyed 
operators). Those allow to register
- * managed operator / keyed  user states. Furthermore, the context provides 
information whether or the operator was
- * restored.
+ *     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+ *         // the keyed state is always up to date anyways
+ *         // just bring the per-partition state in shape
+ *         countPerPartition.clear();
+ *         countPerPartition.add(localCount);
+ *     }
  *
+ *     public T map(T value) throws Exception {
+ *         // update the states
+ *         countPerKey.add(1L);
+ *         localCount++;
  *
- * In {@link #snapshotState(FunctionSnapshotContext)} the implementing class 
must ensure that all operator / keyed state
- * is passed to user states that have been registered during initialization, 
so that it is visible to the system
- * backends for checkpointing.
+ *         return value;
+ *     }
+ * }
+ * }</pre>
+ * 
+ * <hr>
+ * 
+ * <h1><a name="shortcuts">Shortcuts</a></h1>
+ * 
+ * There are various ways that transformation functions can use state without 
implementing the
+ * full-fledged {@code CheckpointedFunction} interface:
+ * 
+ * <h4>Operator State</h4>
  *
+ * Checkpointing some state that is part of the function object itself is 
possible in a simpler way
+ * by directly implementing the {@link ListCheckpointed} interface. 
+ * That mechanism is similar to the previously used {@link Checkpointed} 
interface.
+ * 
+ * <h4>Keyed State</h4>
+ * 
+ * Access to keyed state is possible via the {@link RuntimeContext}'s methods:
+ * <pre>{@code
+ * public class CountPerKeyFunction<T> extends RichMapFunction<T, T> {
+ * 
+ *     private ValueState<Long> count;
+ *     
+ *     public void open(Configuration cfg) throws Exception {
+ *         count = getRuntimeContext().getState(new 
ValueStateDescriptor<>("myCount", Long.class));
+ *     }
+ *     
+ *     public T map(T value) throws Exception {
+ *         Long current = count.get();
+ *         count.update(current == null ? 1L : current + 1);
+ *         
+ *         return value;
+ *     }
+ * }
+ * }</pre>
+ * 
+ * @see ListCheckpointed
+ * @see RuntimeContext
  */
 @PublicEvolving
+@SuppressWarnings("deprecation")
 public interface CheckpointedFunction {
 
        /**
@@ -55,9 +161,8 @@ public interface CheckpointedFunction {
        void snapshotState(FunctionSnapshotContext context) throws Exception;
 
        /**
-        * This method is called when an operator is initialized, so that the 
function can set up it's state through
-        * the provided context. Initialization typically includes registering 
user states through the state stores
-        * that the context offers.
+        * This method is called when the parallel function instance is created 
during distributed
+        * execution. Functions typically set up their state storing data 
structures in this method.
         *
         * @param context the context for initializing the operator
         * @throws Exception

http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
index 5e85dc1..84a9700 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java
@@ -19,29 +19,118 @@
 package org.apache.flink.streaming.api.checkpoint;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.configuration.Configuration;
 
 import java.io.Serializable;
 import java.util.List;
 
 /**
- * This method must be implemented by functions that have state that needs to 
be
- * checkpointed. The functions get a call whenever a checkpoint should take 
place
- * and return a snapshot of their state as a list of redistributable 
sub-states,
- * which will be checkpointed.
+ * This interface can be implemented by functions that want to store state in 
checkpoints.
+ * It can be used in a similar way as the deprecated {@link Checkpointed} 
interface, but
+ * supports <b>list-style state redistribution</b> for cases when the 
parallelism of the
+ * transformation is changed.
  *
+ * <p>Implementing this interface is a shortcut for obtaining the default 
{@code ListState}
+ * from the {@link OperatorStateStore}. Using the {@code OperatorStateStore} 
directly gives
+ * more flexible options to use operator state, for example controlling the 
serialization
+ * of the state objects, or have multiple named states.
+ * 
+ * <h2>State Redistribution</h2>
+ * 
+ * State redistribution happens when the parallelism of the operator is 
changed.
+ * State redistribution of <i>operator state<i> (to which category the state 
handled by this
+ * interface belongs) always goes through a checkpoint, so it appears
+ * to the transformation functions like a failure/recovery combination, where 
recovery happens
+ * with a different parallelism.
+ * 
+ * <p>Conceptually, the state in the checkpoint is the concatenated list of 
all lists
+ * returned by the parallel transformation function instances. When restoring 
from a checkpoint,
+ * the list is divided into sub-lists that are assigned to each parallel 
function instance. 
+ * 
+ * <p>The following sketch illustrates the state redistribution.The function 
runs with parallelism
+ * <i>3</i>. The first two parallel instance of the function return lists with 
two state elements,
+ * the third one a list with one element.  
+ * <pre>
+ *    func_1        func_2     func_3
+ * +----+----+   +----+----+   +----+
+ * | S1 | S2 |   | S3 | S4 |   | S5 |
+ * +----+----+   +----+----+   +----+
+ * </pre>
+ * 
+ * Recovering the checkpoint with <i>parallelism = 5</i> yields the following 
state assignment:
+ * <pre>
+ * func_1   func_2   func_3   func_4   func_5
+ * +----+   +----+   +----+   +----+   +----+
+ * | S1 |   | S2 |   | S3 |   | S4 |   | S5 |
+ * +----+   +----+   +----+   +----+   +----+
+ * </pre>
+ 
+ * Recovering the checkpoint with <i>parallelism = 5</i> yields the following 
state assignment:
+ * <pre>
+ *      func_1          func_2
+ * +----+----+----+   +----+----+
+ * | S1 | S2 | S3 |   | S4 | S5 |
+ * +----+----+----+   +----+----+
+ * </pre>
+ * 
+ * <h2>Example</h2>
+ * 
+ * The following example illustrates how to implement a {@code MapFunction} 
that counts all elements
+ * passing through it, keeping the total count accurate under re-scaling  
(changes or parallelism).
+ * 
+ * <pre>{@code
+ * public class CountingFunction<T> implements MapFunction<T, Tuple2<T, 
Long>>, ListCheckpointed<Long> {
+ * 
+ *     // this count is the number of elements in the parallel subtask 
+ *     private long count;
+ *
+ *     {@literal @}Override
+ *     public List<Long> snapshotState(long checkpointId, long timestamp) {
+ *         // return a single element - our count
+ *         return Collections.singletonList(count);
+ *     }
+ * 
+ *     {@literal @}Override
+ *     public void restoreState(List<Long> state) throws Exception {
+ *         // in case of scale in, this adds up counters from different 
original subtasks
+ *         // in case of scale out, list this may be empty
+ *         for (Long l : state) {
+ *             count += l;
+ *         }
+ *     }
+ *
+ *     {@literal @}Override
+ *     public Tuple2<T, Long> map(T value) {
+ *         count++;
+ *         return new Tuple2<>(value, count);
+ *     }
+ * }
+ * }</pre>
+ * 
  * @param <T> The type of the operator state.
  */
 @PublicEvolving
 public interface ListCheckpointed<T extends Serializable> {
 
        /**
-        * Gets the current state of the function of operator. The state must 
reflect the result of all
-        * prior invocations to this function.
+        * Gets the current state of the function. The state must reflect the 
result of all prior
+        * invocations to this function.
+        * 
+        * <p>The returned list should contain one entry for redistributable 
unit of state. See
+        * the {@link ListCheckpointed class docs} for an illustration how 
list-style state
+        * redistribution works.
+        * 
+        * <p> As special case, the returned list may be null or empty (if the 
operator has no state)
+        * or it may contain a single element (if the operator state is 
indivisible).
         *
-        * @param checkpointId The ID of the checkpoint.
-        * @param timestamp Timestamp of the checkpoint.
+        * @param checkpointId The ID of the checkpoint - a unique and 
monotonously increasing value. 
+        * @param timestamp The wall clock timestamp when the checkpoint was 
triggered by the master.
+        *                     
         * @return The operator state in a list of redistributable, atomic 
sub-states.
         *         Should not return null, but empty list instead.
+        *         
         * @throws Exception Thrown if the creation of the state object failed. 
This causes the
         *                   checkpoint to fail. The system may decide to fail 
the operation (and trigger
         *                   recovery), or to discard this checkpoint attempt 
and to continue running
@@ -51,11 +140,23 @@ public interface ListCheckpointed<T extends Serializable> {
 
        /**
         * Restores the state of the function or operator to that of a previous 
checkpoint.
-        * This method is invoked when a function is executed as part of a 
recovery run.
-        * <p>
-        * Note that restoreState() is called before open().
-        *
+        * This method is invoked when the function is executed after a failure 
recovery.
+        * The state list may be empty if no state is to be recovered by the 
particular parallel instance
+        * of the function.
+        * 
+        * <p>The given state list will contain all the <i>sub states</i> that 
this parallel
+        * instance of the function needs to handle. Refer to the  {@link 
ListCheckpointed class docs}
+        * for an illustration how list-style state redistribution works.
+        * 
+        * <p><b>Important:</b> When implementing this interface together with 
{@link RichFunction},
+        * then the {@code restoreState()} method is called before {@link 
RichFunction#open(Configuration)}.
+        * 
         * @param state The state to be restored as a list of atomic sub-states.
+        * 
+        * @throws Exception Throwing an exception in this method causes the 
recovery to fail.
+        *                   The exact consequence depends on the configured 
failure handling strategy,
+        *                   but typically the system will re-attempt the 
recovery, or try recovering
+        *                   from a different checkpoint.
         */
        void restoreState(List<T> state) throws Exception;
 }

Reply via email to