[BEAM-3206] Shut down executor when spark runner finishes

The Spark runner previously left the JVM process hanging after
completion because its one-time use executor service was never shut
down. This change shuts down the executor after jobs have been
submitted, allowing graceful JVM termination.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5b27333
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5b27333
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5b27333

Branch: refs/heads/tez-runner
Commit: b5b2733338e7a0d5dd373b7a19bea315b3b1c692
Parents: 0df7ba9
Author: Ben Sidhom <sid...@google.com>
Authored: Wed Nov 15 16:05:49 2017 -0800
Committer: Ismaël Mejía <ieme...@gmail.com>
Committed: Fri Nov 17 15:15:08 2017 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/runners/spark/SparkRunner.java   | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b5b27333/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 98ca1be..4a409cb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -190,6 +190,7 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
                   jssc.start();
                 }
               });
+      executorService.shutdown();
 
       result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
     } else {
@@ -214,6 +215,7 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
                   LOG.info("Batch pipeline execution complete.");
                 }
               });
+      executorService.shutdown();
 
       result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
     }

Reply via email to