Repository: incubator-beam Updated Branches: refs/heads/master 3b2e0290d -> c53e0b162
[BEAM-1095] Add support set config for reuse-object on flink Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1b125207 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1b125207 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1b125207 Branch: refs/heads/master Commit: 1b1252074dd6b57f4fb88ceb82c704d3d3d8147f Parents: 3b2e029 Author: Alexey Diomin <diomi...@gmail.com> Authored: Wed Dec 7 09:39:27 2016 +0400 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Dec 8 09:44:00 2016 +0800 ---------------------------------------------------------------------- .../flink/FlinkPipelineExecutionEnvironment.java | 12 ++++++++++++ .../apache/beam/runners/flink/FlinkPipelineOptions.java | 5 +++++ 2 files changed, 17 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b125207/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index 391c3f2..69dcd5e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -159,6 +159,12 @@ public class FlinkPipelineExecutionEnvironment { // set parallelism in the options (required by some execution code) options.setParallelism(flinkBatchEnv.getParallelism()); + if (options.getObjectReuse()) { + flinkBatchEnv.getConfig().enableObjectReuse(); + } else { + flinkBatchEnv.getConfig().disableObjectReuse(); + } + return flinkBatchEnv; } @@ -197,6 +203,12 @@ public class FlinkPipelineExecutionEnvironment { // set parallelism in the options (required by some execution code) options.setParallelism(flinkStreamEnv.getParallelism()); + if (options.getObjectReuse()) { + flinkStreamEnv.getConfig().enableObjectReuse(); + } else { + flinkStreamEnv.getConfig().disableObjectReuse(); + } + // default to event time flinkStreamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1b125207/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index be99f29..3bb358e 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -83,6 +83,11 @@ public interface FlinkPipelineOptions Long getExecutionRetryDelay(); void setExecutionRetryDelay(Long delay); + @Description("Sets the behavior of reusing objects.") + @Default.Boolean(false) + Boolean getObjectReuse(); + void setObjectReuse(Boolean reuse); + /** * Sets a state backend to store Beam's state during computation. * Note: Only applicable when executing in streaming mode.