Flink runner: specify CheckpointingMode through PipelineOptions.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8035ae7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8035ae7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8035ae7 Branch: refs/heads/master Commit: b8035ae7ad226bd0261a70fb8e0041e0f07e6dfe Parents: 1866a01 Author: Pei He <p...@apache.org> Authored: Sat May 27 14:41:26 2017 +0800 Committer: Pei He <hepei...@alibaba-inc.com> Committed: Sun Jun 4 16:18:36 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/flink/FlinkPipelineExecutionEnvironment.java | 2 +- .../org/apache/beam/runners/flink/FlinkPipelineOptions.java | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b8035ae7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 7765a00..98f7c5a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -227,7 +227,7 @@ class FlinkPipelineExecutionEnvironment { if (checkpointInterval < 1) { throw new IllegalArgumentException("The checkpoint interval must be positive"); } - flinkStreamEnv.enableCheckpointing(checkpointInterval); + flinkStreamEnv.enableCheckpointing(checkpointInterval, options.getCheckpointingMode()); boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled(); boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation(); if (externalizedCheckpoint) { http://git-wip-us.apache.org/repos/asf/beam/blob/b8035ae7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 764fa5f..ee07abb 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.CheckpointingMode; /** * Options which can be used to configure a Flink PipelineRunner. @@ -70,6 +71,11 @@ public interface FlinkPipelineOptions Long getCheckpointingInterval(); void setCheckpointingInterval(Long interval); + @Description("The checkpointing mode that defines consistency guarantee.") + @Default.Enum("AT_LEAST_ONCE") + CheckpointingMode getCheckpointingMode(); + void setCheckpointingMode(CheckpointingMode mode); + @Description("Sets the number of times that failed tasks are re-executed. " + "A value of zero effectively disables fault tolerance. A value of -1 indicates " + "that the system default value (as defined in the configuration) should be used.")