This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b67847  [BEAM-7110] Add Spark master option to SparkJobServerDriver
     new 81faf35  Merge pull request #8379 from ibzib/spark-master2
4b67847 is described below

commit 4b67847193da92cc8e59d37b0539bfb2bc6ab37f
Author: Kyle Weaver <kcwea...@google.com>
AuthorDate: Thu Apr 18 12:54:03 2019 -0700

    [BEAM-7110] Add Spark master option to SparkJobServerDriver
---
 runners/spark/job-server/build.gradle              |  2 ++
 .../apache/beam/runners/spark/SparkJobInvoker.java | 14 ++++++++++---
 .../beam/runners/spark/SparkJobServerDriver.java   | 23 +++++++++++++++++-----
 .../beam/runners/spark/SparkPipelineOptions.java   |  4 +++-
 4 files changed, 34 insertions(+), 9 deletions(-)

diff --git a/runners/spark/job-server/build.gradle 
b/runners/spark/job-server/build.gradle
index 2ce34fe..7ebde87 100644
--- a/runners/spark/job-server/build.gradle
+++ b/runners/spark/job-server/build.gradle
@@ -70,6 +70,8 @@ runShadow {
     args += ["--artifacts-dir=${project.property('artifactsDir')}"]
   if (project.hasProperty('cleanArtifactsPerJob'))
     args += 
["--clean-artifacts-per-job=${project.property('cleanArtifactsPerJob')}"]
+  if (project.hasProperty('sparkMasterUrl'))
+    args += ["--spark-master-url=${project.property('sparkMasterUrl')}"]
 
   // Enable remote debugging.
   jvmArgs = ["-Xdebug", 
"-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005"]
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
index e47c851..da35ae2 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobInvoker.java
@@ -34,12 +34,16 @@ public class SparkJobInvoker extends JobInvoker {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkJobInvoker.class);
 
-  public static SparkJobInvoker create() {
-    return new SparkJobInvoker();
+  private SparkJobServerDriver.SparkServerConfiguration configuration;
+
+  public static SparkJobInvoker create(
+      SparkJobServerDriver.SparkServerConfiguration configuration) {
+    return new SparkJobInvoker(configuration);
   }
 
-  private SparkJobInvoker() {
+  private SparkJobInvoker(SparkJobServerDriver.SparkServerConfiguration 
configuration) {
     super("spark-runner-job-invoker");
+    this.configuration = configuration;
   }
 
   @Override
@@ -56,6 +60,10 @@ public class SparkJobInvoker extends JobInvoker {
         String.format("%s_%s", sparkOptions.getJobName(), 
UUID.randomUUID().toString());
     LOG.info("Invoking job {}", invocationId);
 
+    if 
(sparkOptions.getSparkMaster().equals(SparkPipelineOptions.DEFAULT_MASTER_URL)) 
{
+      sparkOptions.setSparkMaster(configuration.getSparkMasterUrl());
+    }
+
     // Options can't be translated to proto if runner class is unresolvable, 
so set it to null.
     sparkOptions.setRunner(null);
 
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
index 387907f..0589045 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkJobServerDriver.java
@@ -24,6 +24,7 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.kohsuke.args4j.CmdLineException;
 import org.kohsuke.args4j.CmdLineParser;
+import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,11 +33,23 @@ public class SparkJobServerDriver extends JobServerDriver {
 
   @Override
   protected JobInvoker createJobInvoker() {
-    return SparkJobInvoker.create();
+    return SparkJobInvoker.create((SparkServerConfiguration) configuration);
   }
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkJobServerDriver.class);
 
+  /** Spark runner-specific Configuration for the jobServer. */
+  public static class SparkServerConfiguration extends ServerConfiguration {
+    @Option(
+        name = "--spark-master-url",
+        usage = "Spark master url to submit job (e.g. spark://host:port, 
local[4])")
+    private String sparkMasterUrl = SparkPipelineOptions.DEFAULT_MASTER_URL;
+
+    String getSparkMasterUrl() {
+      return this.sparkMasterUrl;
+    }
+  }
+
   public static void main(String[] args) {
     FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
     fromParams(args).run();
@@ -50,7 +63,7 @@ public class SparkJobServerDriver extends JobServerDriver {
   }
 
   private static SparkJobServerDriver fromParams(String[] args) {
-    ServerConfiguration configuration = new ServerConfiguration();
+    SparkServerConfiguration configuration = new SparkServerConfiguration();
     CmdLineParser parser = new CmdLineParser(configuration);
     try {
       parser.parseArgument(args);
@@ -63,7 +76,7 @@ public class SparkJobServerDriver extends JobServerDriver {
     return fromConfig(configuration);
   }
 
-  private static SparkJobServerDriver fromConfig(ServerConfiguration 
configuration) {
+  private static SparkJobServerDriver fromConfig(SparkServerConfiguration 
configuration) {
     return create(
         configuration,
         createJobServerFactory(configuration),
@@ -71,14 +84,14 @@ public class SparkJobServerDriver extends JobServerDriver {
   }
 
   private static SparkJobServerDriver create(
-      ServerConfiguration configuration,
+      SparkServerConfiguration configuration,
       ServerFactory jobServerFactory,
       ServerFactory artifactServerFactory) {
     return new SparkJobServerDriver(configuration, jobServerFactory, 
artifactServerFactory);
   }
 
   private SparkJobServerDriver(
-      ServerConfiguration configuration,
+      SparkServerConfiguration configuration,
       ServerFactory jobServerFactory,
       ServerFactory artifactServerFactory) {
     super(configuration, jobServerFactory, artifactServerFactory);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 6935b54..4bf51e8 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -33,8 +33,10 @@ import org.apache.beam.sdk.options.StreamingOptions;
 public interface SparkPipelineOptions
     extends PipelineOptions, StreamingOptions, ApplicationNameOptions {
 
+  String DEFAULT_MASTER_URL = "local[4]";
+
   @Description("The url of the spark master to connect to, (e.g. 
spark://host:port, local[4]).")
-  @Default.String("local[4]")
+  @Default.String(DEFAULT_MASTER_URL)
   String getSparkMaster();
 
   void setSparkMaster(String master);

Reply via email to