[hotfix] [streaming api] Improve JavaDocs of the user-facing checkpointing and state interfaces
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1dfb3e0e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1dfb3e0e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1dfb3e0e Branch: refs/heads/release-1.2 Commit: 1dfb3e0e2a3b933c1a2378ccd4e462502b0da276 Parents: 083152c Author: Stephan Ewen <se...@apache.org> Authored: Mon Jan 23 19:07:54 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Jan 23 19:44:41 2017 +0100 ---------------------------------------------------------------------- .../runtime/state/ManagedSnapshotContext.java | 11 +- .../streaming/api/checkpoint/Checkpointed.java | 35 ++++- .../checkpoint/CheckpointedAsynchronously.java | 48 ++++--- .../api/checkpoint/CheckpointedFunction.java | 133 +++++++++++++++++-- .../api/checkpoint/ListCheckpointed.java | 125 +++++++++++++++-- 5 files changed, 297 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java index 14156a6..de65c5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ManagedSnapshotContext.java @@ -29,13 +29,18 @@ import org.apache.flink.annotation.PublicEvolving; public interface ManagedSnapshotContext { /** - * Returns the Id of the checkpoint for which the snapshot is taken. + * Returns the ID of the checkpoint for which the snapshot is taken. + * + * <p>The checkpoint ID is guaranteed to be strictly monotonously increasing across checkpoints. + * For two completed checkpoints <i>A</i> and <i>B</i>, {@code ID_B > ID_A} means that checkpoint + * <i>B</i> subsumes checkpoint <i>A</i>, i.e., checkpoint <i>B</i>it contains a later state + * than checkpoint <i>A</i>. */ long getCheckpointId(); /** - * Returns the timestamp of the checkpoint for which the snapshot is taken. + * Returns timestamp (wall clock time) when the master node triggered the checkpoint for which + * the state snapshot is taken. */ long getCheckpointTimestamp(); - } http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java index fb67ea7..dd93462 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java @@ -27,13 +27,38 @@ import java.io.Serializable; * checkpointed. The functions get a call whenever a checkpoint should take place * and return a snapshot of their state, which will be checkpointed. * - * <p>This interface marks a function as <i>synchronously</i> checkpointed. While the - * state is written, the function is not called, so the function needs not return a - * copy of its state, but may return a reference to its state. Functions that can - * continue to work and mutate the state, even while the state snapshot is being accessed, - * can implement the {@link CheckpointedAsynchronously} interface.</p> + * <h1>Deprecation and Replacement</h1> + * + * The short cut replacement for this interface is via {@link ListCheckpointed} and works + * as shown in the example below. The {@code ListCheckpointed} interface returns a list of + * elements ( + * + * + * + * <pre>{@code + * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> { + * + * private int count; + * + * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + * return Collections.singletonList(this.count); + * } + * + * public void restoreState(List<Integer> state) throws Exception { + * this.value = state.count.isEmpty() ? 0 : state.get(0); + * } + * + * public T map(T value) { + * count++; + * return value; + * } + * } + * }</pre> * * @param <T> The type of the operator state. + * + * @deprecated Please use {@link ListCheckpointed} as illustrated above, or + * {@link CheckpointedFunction} for more control over the checkpointing process. */ @Deprecated @PublicEvolving http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java index 6fcc1d5..4bafd90 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java @@ -18,38 +18,44 @@ package org.apache.flink.streaming.api.checkpoint; - import org.apache.flink.annotation.PublicEvolving; import java.io.Serializable; /** - * This interface marks a function/operator as <i>asynchronously checkpointed</i>. - * Similar to the {@link Checkpointed} interface, the function must produce a - * snapshot of its state. However, the function must be able to continue working - * and mutating its state without mutating the returned state snapshot. + * This interface marks a function/operator as checkpointed similar to the + * {@link Checkpointed} interface, but gives the Flink framework the option to + * perform the checkpoint asynchronously. Note that asynchronous checkpointing for + * this interface has not been implemented. * - * <p>Asynchronous checkpoints are desirable, because they allow the data streams at the - * point of the checkpointed function/operator to continue running while the checkpoint - * is in progress.</p> + * <h1>Deprecation and Replacement</h1> + * + * The shortcut replacement for this interface is via {@link ListCheckpointed} and works + * as shown in the example below. Please refer to the JavaDocs of {@link ListCheckpointed} for + * a more detailed description of how to use the new interface. * - * <p>To be able to support asynchronous snapshots, the state returned by the - * {@link #snapshotState(long, long)} method is typically a copy or shadow copy - * of the actual state.</p> - * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction}. - * - * The short cut replacement via {@link ListCheckpointed} * <pre>{@code - * public class ExampleOperator implements ListCheckpointed<Integer> { + * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> { + * + * private int count; * - * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { - * return Collections.singletonList(this.value); - * } + * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { + * return Collections.singletonList(this.count); + * } * - * public void restoreState(List<Integer> state) throws Exception { - * this.value = state.get(0); - * } + * public void restoreState(List<Integer> state) throws Exception { + * this.value = state.count.isEmpty() ? 0 : state.get(0); + * } + * + * public T map(T value) { + * count++; + * return value; + * } + * } * }</pre> + * + * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction} instead, + * as illustrated in the example above. */ @Deprecated @PublicEvolving http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java index 37d8244..51ac5db 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java @@ -19,29 +19,135 @@ package org.apache.flink.streaming.api.checkpoint; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.KeyedStateStore; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; /** + * This is the core interface for <i>stateful transformation functions</i>, meaning functions + * that maintain state across individual stream records. + * While more lightweight interfaces exist as shortcuts for various types of state, this interface offer the + * greatest flexibility in managing both <i>keyed state</i> and <i>operator state</i>. + * + * <p>The section <a href="#shortcuts">Shortcuts</a> illustrates the common lightweight + * ways to setup stateful functions typically used instead of the full fledged + * abstraction represented by this interface. + * + * <h1>Initialization</h1> + * + * The {@link CheckpointedFunction#initializeState(FunctionInitializationContext)} is called when + * the parallel instance of the transformation function is created during distributed execution. + * The method gives access to the {@link FunctionInitializationContext} which in turn gives access + * to the to the {@link OperatorStateStore} and {@link KeyedStateStore}. + * + * <p>The {@code OperatorStateStore} and {@code KeyedStateStore} give access to the data structures + * in which state should be stored for Flink to transparently manage and checkpoint it, such as + * {@link org.apache.flink.api.common.state.ValueState} or {@link org.apache.flink.api.common.state.ListState}. + + * <p><i>Note:</i> The {@code KeyedStateStore} can only be used when the transformation supports + * <i>keyed state</i>, i.e., when it is applied on a keyed stream (after a {@code keyBy(...)}). + * + * <h1>Snapshot</h1> + * + * The {@link CheckpointedFunction#snapshotState(FunctionSnapshotContext)} is called whenever a + * checkpoint takes a state snapshot of the transformation function. Inside this method, functions typically + * make sure that the checkpointed data structures (obtained in the initialization phase) are up + * to date for a snapshot to be taken. The given snapshot context gives access to the metadata + * of the checkpoint. + * + * <p>In addition, functions can use this method as a hook to flush/commit/synchronize with + * external systems. + * + * <h1>Example</h1> + * + * The code example below illustrates how to use this interface for a function that keeps counts + * of events per key and per parallel partition (parallel instance of the transformation function + * during distributed execution). + * The example also changes of parallelism, which affect the count-per-parallel-partition by + * adding up the counters of partitions that get merged on scale-down. Note that this is a + * toy example, but should illustrate the basic skeleton for a stateful function. + * + * <pre>{@code + * public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction { + * + * private ReducingState<Long> countPerKey; + * private ListState<Long> countPerPartition; + * + * private long localCount; + * + * public void initializeState(FunctionInitializationContext context) throws Exception { + * // get the state data structure for the per-key state + * countPerKey = context.getKeyedStateStore().getReducingState( + * new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class)); + * + * // get the state data structure for the per-key state + * countPerPartition = context.getOperatorStateStore().getOperatorState( + * new ListStateDescriptor<>("perPartitionCount", Long.class)); * - * Similar to @{@link Checkpointed}, this interface must be implemented by functions that have potentially - * repartitionable state that needs to be checkpointed. Methods from this interface are called upon checkpointing and - * initialization of state. + * // initialize the "local count variable" based on the operator state + * for (Long l : countPerPartition.get()) { + * localCount += l; + * } + * } * - * On {@link #initializeState(FunctionInitializationContext)} the implementing class receives a - * {@link FunctionInitializationContext} which provides access to the {@link OperatorStateStore} (all) and - * {@link org.apache.flink.api.common.state.KeyedStateStore} (only for keyed operators). Those allow to register - * managed operator / keyed user states. Furthermore, the context provides information whether or the operator was - * restored. + * public void snapshotState(FunctionSnapshotContext context) throws Exception { + * // the keyed state is always up to date anyways + * // just bring the per-partition state in shape + * countPerPartition.clear(); + * countPerPartition.add(localCount); + * } * + * public T map(T value) throws Exception { + * // update the states + * countPerKey.add(1L); + * localCount++; * - * In {@link #snapshotState(FunctionSnapshotContext)} the implementing class must ensure that all operator / keyed state - * is passed to user states that have been registered during initialization, so that it is visible to the system - * backends for checkpointing. + * return value; + * } + * } + * }</pre> + * + * <hr> + * + * <h1><a name="shortcuts">Shortcuts</a></h1> + * + * There are various ways that transformation functions can use state without implementing the + * full-fledged {@code CheckpointedFunction} interface: + * + * <h4>Operator State</h4> * + * Checkpointing some state that is part of the function object itself is possible in a simpler way + * by directly implementing the {@link ListCheckpointed} interface. + * That mechanism is similar to the previously used {@link Checkpointed} interface. + * + * <h4>Keyed State</h4> + * + * Access to keyed state is possible via the {@link RuntimeContext}'s methods: + * <pre>{@code + * public class CountPerKeyFunction<T> extends RichMapFunction<T, T> { + * + * private ValueState<Long> count; + * + * public void open(Configuration cfg) throws Exception { + * count = getRuntimeContext().getState(new ValueStateDescriptor<>("myCount", Long.class)); + * } + * + * public T map(T value) throws Exception { + * Long current = count.get(); + * count.update(current == null ? 1L : current + 1); + * + * return value; + * } + * } + * }</pre> + * + * @see ListCheckpointed + * @see RuntimeContext */ @PublicEvolving +@SuppressWarnings("deprecation") public interface CheckpointedFunction { /** @@ -55,9 +161,8 @@ public interface CheckpointedFunction { void snapshotState(FunctionSnapshotContext context) throws Exception; /** - * This method is called when an operator is initialized, so that the function can set up it's state through - * the provided context. Initialization typically includes registering user states through the state stores - * that the context offers. + * This method is called when the parallel function instance is created during distributed + * execution. Functions typically set up their state storing data structures in this method. * * @param context the context for initializing the operator * @throws Exception http://git-wip-us.apache.org/repos/asf/flink/blob/1dfb3e0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java index 5e85dc1..84a9700 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ListCheckpointed.java @@ -19,29 +19,118 @@ package org.apache.flink.streaming.api.checkpoint; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.RichFunction; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.configuration.Configuration; import java.io.Serializable; import java.util.List; /** - * This method must be implemented by functions that have state that needs to be - * checkpointed. The functions get a call whenever a checkpoint should take place - * and return a snapshot of their state as a list of redistributable sub-states, - * which will be checkpointed. + * This interface can be implemented by functions that want to store state in checkpoints. + * It can be used in a similar way as the deprecated {@link Checkpointed} interface, but + * supports <b>list-style state redistribution</b> for cases when the parallelism of the + * transformation is changed. * + * <p>Implementing this interface is a shortcut for obtaining the default {@code ListState} + * from the {@link OperatorStateStore}. Using the {@code OperatorStateStore} directly gives + * more flexible options to use operator state, for example controlling the serialization + * of the state objects, or have multiple named states. + * + * <h2>State Redistribution</h2> + * + * State redistribution happens when the parallelism of the operator is changed. + * State redistribution of <i>operator state<i> (to which category the state handled by this + * interface belongs) always goes through a checkpoint, so it appears + * to the transformation functions like a failure/recovery combination, where recovery happens + * with a different parallelism. + * + * <p>Conceptually, the state in the checkpoint is the concatenated list of all lists + * returned by the parallel transformation function instances. When restoring from a checkpoint, + * the list is divided into sub-lists that are assigned to each parallel function instance. + * + * <p>The following sketch illustrates the state redistribution.The function runs with parallelism + * <i>3</i>. The first two parallel instance of the function return lists with two state elements, + * the third one a list with one element. + * <pre> + * func_1 func_2 func_3 + * +----+----+ +----+----+ +----+ + * | S1 | S2 | | S3 | S4 | | S5 | + * +----+----+ +----+----+ +----+ + * </pre> + * + * Recovering the checkpoint with <i>parallelism = 5</i> yields the following state assignment: + * <pre> + * func_1 func_2 func_3 func_4 func_5 + * +----+ +----+ +----+ +----+ +----+ + * | S1 | | S2 | | S3 | | S4 | | S5 | + * +----+ +----+ +----+ +----+ +----+ + * </pre> + + * Recovering the checkpoint with <i>parallelism = 5</i> yields the following state assignment: + * <pre> + * func_1 func_2 + * +----+----+----+ +----+----+ + * | S1 | S2 | S3 | | S4 | S5 | + * +----+----+----+ +----+----+ + * </pre> + * + * <h2>Example</h2> + * + * The following example illustrates how to implement a {@code MapFunction} that counts all elements + * passing through it, keeping the total count accurate under re-scaling (changes or parallelism). + * + * <pre>{@code + * public class CountingFunction<T> implements MapFunction<T, Tuple2<T, Long>>, ListCheckpointed<Long> { + * + * // this count is the number of elements in the parallel subtask + * private long count; + * + * {@literal @}Override + * public List<Long> snapshotState(long checkpointId, long timestamp) { + * // return a single element - our count + * return Collections.singletonList(count); + * } + * + * {@literal @}Override + * public void restoreState(List<Long> state) throws Exception { + * // in case of scale in, this adds up counters from different original subtasks + * // in case of scale out, list this may be empty + * for (Long l : state) { + * count += l; + * } + * } + * + * {@literal @}Override + * public Tuple2<T, Long> map(T value) { + * count++; + * return new Tuple2<>(value, count); + * } + * } + * }</pre> + * * @param <T> The type of the operator state. */ @PublicEvolving public interface ListCheckpointed<T extends Serializable> { /** - * Gets the current state of the function of operator. The state must reflect the result of all - * prior invocations to this function. + * Gets the current state of the function. The state must reflect the result of all prior + * invocations to this function. + * + * <p>The returned list should contain one entry for redistributable unit of state. See + * the {@link ListCheckpointed class docs} for an illustration how list-style state + * redistribution works. + * + * <p> As special case, the returned list may be null or empty (if the operator has no state) + * or it may contain a single element (if the operator state is indivisible). * - * @param checkpointId The ID of the checkpoint. - * @param timestamp Timestamp of the checkpoint. + * @param checkpointId The ID of the checkpoint - a unique and monotonously increasing value. + * @param timestamp The wall clock timestamp when the checkpoint was triggered by the master. + * * @return The operator state in a list of redistributable, atomic sub-states. * Should not return null, but empty list instead. + * * @throws Exception Thrown if the creation of the state object failed. This causes the * checkpoint to fail. The system may decide to fail the operation (and trigger * recovery), or to discard this checkpoint attempt and to continue running @@ -51,11 +140,23 @@ public interface ListCheckpointed<T extends Serializable> { /** * Restores the state of the function or operator to that of a previous checkpoint. - * This method is invoked when a function is executed as part of a recovery run. - * <p> - * Note that restoreState() is called before open(). - * + * This method is invoked when the function is executed after a failure recovery. + * The state list may be empty if no state is to be recovered by the particular parallel instance + * of the function. + * + * <p>The given state list will contain all the <i>sub states</i> that this parallel + * instance of the function needs to handle. Refer to the {@link ListCheckpointed class docs} + * for an illustration how list-style state redistribution works. + * + * <p><b>Important:</b> When implementing this interface together with {@link RichFunction}, + * then the {@code restoreState()} method is called before {@link RichFunction#open(Configuration)}. + * * @param state The state to be restored as a list of atomic sub-states. + * + * @throws Exception Throwing an exception in this method causes the recovery to fail. + * The exact consequence depends on the configured failure handling strategy, + * but typically the system will re-attempt the recovery, or try recovering + * from a different checkpoint. */ void restoreState(List<T> state) throws Exception; }