Flink runner: specify CheckpointingMode through PipelineOptions.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b8035ae7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b8035ae7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b8035ae7

Branch: refs/heads/master
Commit: b8035ae7ad226bd0261a70fb8e0041e0f07e6dfe
Parents: 1866a01
Author: Pei He <p...@apache.org>
Authored: Sat May 27 14:41:26 2017 +0800
Committer: Pei He <hepei...@alibaba-inc.com>
Committed: Sun Jun 4 16:18:36 2017 +0800

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkPipelineExecutionEnvironment.java  | 2 +-
 .../org/apache/beam/runners/flink/FlinkPipelineOptions.java    | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b8035ae7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 7765a00..98f7c5a 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -227,7 +227,7 @@ class FlinkPipelineExecutionEnvironment {
       if (checkpointInterval < 1) {
         throw new IllegalArgumentException("The checkpoint interval must be 
positive");
       }
-      flinkStreamEnv.enableCheckpointing(checkpointInterval);
+      flinkStreamEnv.enableCheckpointing(checkpointInterval, 
options.getCheckpointingMode());
       boolean externalizedCheckpoint = 
options.isExternalizedCheckpointsEnabled();
       boolean retainOnCancellation = 
options.getRetainExternalizedCheckpointsOnCancellation();
       if (externalizedCheckpoint) {

http://git-wip-us.apache.org/repos/asf/beam/blob/b8035ae7/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index 764fa5f..ee07abb 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
 
 /**
  * Options which can be used to configure a Flink PipelineRunner.
@@ -70,6 +71,11 @@ public interface FlinkPipelineOptions
   Long getCheckpointingInterval();
   void setCheckpointingInterval(Long interval);
 
+  @Description("The checkpointing mode that defines consistency guarantee.")
+  @Default.Enum("AT_LEAST_ONCE")
+  CheckpointingMode getCheckpointingMode();
+  void setCheckpointingMode(CheckpointingMode mode);
+
   @Description("Sets the number of times that failed tasks are re-executed. "
       + "A value of zero effectively disables fault tolerance. A value of -1 
indicates "
       + "that the system default value (as defined in the configuration) 
should be used.")

Reply via email to