Repository: incubator-beam
Updated Branches:
  refs/heads/master 3b2e0290d -> c53e0b162


[BEAM-1095] Add support set config for reuse-object on flink


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b125207
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b125207
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b125207

Branch: refs/heads/master
Commit: 1b1252074dd6b57f4fb88ceb82c704d3d3d8147f
Parents: 3b2e029
Author: Alexey Diomin <diomi...@gmail.com>
Authored: Wed Dec 7 09:39:27 2016 +0400
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Dec 8 09:44:00 2016 +0800

----------------------------------------------------------------------
 .../flink/FlinkPipelineExecutionEnvironment.java        | 12 ++++++++++++
 .../apache/beam/runners/flink/FlinkPipelineOptions.java |  5 +++++
 2 files changed, 17 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b125207/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
index 391c3f2..69dcd5e 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
@@ -159,6 +159,12 @@ public class FlinkPipelineExecutionEnvironment {
     // set parallelism in the options (required by some execution code)
     options.setParallelism(flinkBatchEnv.getParallelism());
 
+    if (options.getObjectReuse()) {
+      flinkBatchEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkBatchEnv.getConfig().disableObjectReuse();
+    }
+
     return flinkBatchEnv;
   }
 
@@ -197,6 +203,12 @@ public class FlinkPipelineExecutionEnvironment {
     // set parallelism in the options (required by some execution code)
     options.setParallelism(flinkStreamEnv.getParallelism());
 
+    if (options.getObjectReuse()) {
+      flinkStreamEnv.getConfig().enableObjectReuse();
+    } else {
+      flinkStreamEnv.getConfig().disableObjectReuse();
+    }
+
     // default to event time
     flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b125207/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index be99f29..3bb358e 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -83,6 +83,11 @@ public interface FlinkPipelineOptions
   Long getExecutionRetryDelay();
   void setExecutionRetryDelay(Long delay);
 
+  @Description("Sets the behavior of reusing objects.")
+  @Default.Boolean(false)
+  Boolean getObjectReuse();
+  void setObjectReuse(Boolean reuse);
+
   /**
    * Sets a state backend to store Beam's state during computation.
    * Note: Only applicable when executing in streaming mode.

Reply via email to