Repository: incubator-beam
Updated Branches:
  refs/heads/master f62d04e22 -> 843275210


[BEAM-642] Support Flink Detached Mode for JOB execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dc69bc48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dc69bc48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dc69bc48

Branch: refs/heads/master
Commit: dc69bc48b0057f45d849d3cfec848fa066ee0854
Parents: f62d04e
Author: Sumit Chawla <sumic...@cisco.com>
Authored: Mon Sep 19 15:10:53 2016 -0700
Committer: Maximilian Michels <m...@apache.org>
Committed: Thu Sep 22 11:30:09 2016 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/flink/FlinkRunner.java  | 25 +++++++++++++-------
 1 file changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc69bc48/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index d3c65c0..137fdeb 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -25,6 +25,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -56,6 +57,7 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.client.program.DetachedEnvironment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,18 +153,23 @@ public class FlinkRunner extends 
PipelineRunner<FlinkRunnerResult> {
       throw new RuntimeException("Pipeline execution failed", e);
     }
 
-    LOG.info("Execution finished in {} msecs", result.getNetRuntime());
-
-    Map<String, Object> accumulators = result.getAllAccumulatorResults();
-    if (accumulators != null && !accumulators.isEmpty()) {
-      LOG.info("Final aggregator values:");
+    if (result instanceof DetachedEnvironment.DetachedJobExecutionResult) {
+      LOG.info("Pipeline submitted in Detached mode");
+      Map<String, Object> accumulators = Collections.emptyMap();
+      return new FlinkRunnerResult(accumulators, -1L);
+    } else {
+      LOG.info("Execution finished in {} msecs", result.getNetRuntime());
+      Map<String, Object> accumulators = result.getAllAccumulatorResults();
+      if (accumulators != null && !accumulators.isEmpty()) {
+        LOG.info("Final aggregator values:");
 
-      for (Map.Entry<String, Object> entry : 
result.getAllAccumulatorResults().entrySet()) {
-        LOG.info("{} : {}", entry.getKey(), entry.getValue());
+        for (Map.Entry<String, Object> entry : 
result.getAllAccumulatorResults().entrySet()) {
+          LOG.info("{} : {}", entry.getKey(), entry.getValue());
+        }
       }
-    }
 
-    return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+      return new FlinkRunnerResult(accumulators, result.getNetRuntime());
+    }
   }
 
   /**

Reply via email to