Repository: zeppelin
Updated Branches:
  refs/heads/master e1f20ddda -> 1b9c46dcf


ZEPPELIN-1263. Should specify zeppelin's spark configuration through --conf 
arguments of spark-submit

### What is this PR for?

For now we spark configuration at runtime rather than pass them through 
`--conf`, it would cause several issues.
- Some configuration has to be set through --conf, otherwise we need to 
duplicate code in SparkSubmit.scala (spark.yarn.keytab, spark.yarn.principal)
- Some configuration would conflict with spark-defaults.conf. If you specify 
spark.master as yarn-client in spark-defaults.conf but specify spark.master as 
local in zeppelin side, you will see the spark interpreter fail to start due to 
this inconsistency.
- As ZEPPELIN-1460 described, it is hard to figure what is the effective 
configuration.
- We can not use yarn-cluster mode although it is not supported now, but I 
think it is necessary to do that as zeppelin needs to support multiple users.

This PR would pass all the spark related configuration to spark-submit through 
`--conf`, so that it is easy to know and guarantee that configuration on 
zeppelin interpreter setting take precedence over spark-defaults.conf.  And it 
is also good for maintenance that upstream change (any change about 
configuration in spark) would not affect us.
### What type of PR is it?

[Improvement]
### Todos
- [ ] - Task
### What is the Jira issue?
- https://issues.apache.org/jira/browse/ZEPPELIN-1263
### How should this be tested?

Tested spark 1.6 spark 2.0 on both yarn-client mode and embedded mode.
### Screenshots (if appropriate)

![image](https://cloud.githubusercontent.com/assets/164491/18702212/3e7b54d0-8013-11e6-95f7-502b3cf89d67.png)
### Questions:
- Does the licenses files need update? No
- Is there breaking changes for older versions? No
- Does this needs documentation? No

…

Author: Jeff Zhang <zjf...@apache.org>

Closes #1446 from zjffdu/ZEPPELIN-1263 and squashes the following commits:

f57d7bb [Jeff Zhang] address comments
151e991 [Jeff Zhang] ZEPPELIN-1263. Should specify zeppelin's spark 
configuration through --conf arguments of spark-submit


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1b9c46dc
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1b9c46dc
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1b9c46dc

Branch: refs/heads/master
Commit: 1b9c46dcfd2bec6e0c1a2d4f1aa41d2834d23c77
Parents: e1f20dd
Author: Jeff Zhang <zjf...@apache.org>
Authored: Mon Jul 17 13:26:15 2017 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Sat Jul 22 20:30:44 2017 +0800

----------------------------------------------------------------------
 bin/interpreter.sh                              |  4 +--
 .../apache/zeppelin/spark/SparkInterpreter.java | 25 +++++---------
 .../interpreter/remote/RemoteInterpreter.java   | 36 +++++++++++++++-----
 3 files changed, 38 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b9c46dc/bin/interpreter.sh
----------------------------------------------------------------------
diff --git a/bin/interpreter.sh b/bin/interpreter.sh
index 29d0221..1344e31 100755
--- a/bin/interpreter.sh
+++ b/bin/interpreter.sh
@@ -202,9 +202,9 @@ fi
 
 if [[ -n "${SPARK_SUBMIT}" ]]; then
     if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ 
"$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]];  then
-       INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class 
${ZEPPELIN_SERVER} --driver-class-path 
\"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" 
--driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} 
--proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}`
+       INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class 
${ZEPPELIN_SERVER} --driver-class-path 
\"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" 
--driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} 
${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} 
${SPARK_APP_JAR} ${PORT}`
     else
-       INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class 
${ZEPPELIN_SERVER} --driver-class-path 
\"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" 
--driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} 
${SPARK_APP_JAR} ${PORT}`
+       INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class 
${ZEPPELIN_SERVER} --driver-class-path 
\"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" 
--driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} 
${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${PORT}`
     fi
 else
     INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} 
${ZEPPELIN_INTP_MEM} -cp 
${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} 
${ZEPPELIN_SERVER} ${PORT} `

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b9c46dc/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index dd1fa11..df41014 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -359,10 +359,10 @@ public class SparkInterpreter extends Interpreter {
    * Create SparkSession
    */
   public Object createSparkSession() {
-    logger.info("------ Create new SparkContext {} -------", 
getProperty("master"));
+    // use local mode for embedded spark mode when spark.master is not found
+    conf.setIfMissing("spark.master", "local");
+    logger.info("------ Create new SparkSession {} -------", 
conf.get("spark.master"));
     String execUri = System.getenv("SPARK_EXECUTOR_URI");
-    conf.setAppName(getProperty("spark.app.name"));
-
     if (outputDir != null) {
       conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath());
     }
@@ -370,11 +370,6 @@ public class SparkInterpreter extends Interpreter {
     if (execUri != null) {
       conf.set("spark.executor.uri", execUri);
     }
-
-    if (System.getenv("SPARK_HOME") != null) {
-      conf.setSparkHome(System.getenv("SPARK_HOME"));
-    }
-
     conf.set("spark.scheduler.mode", "FAIR");
     conf.setMaster(getProperty("master"));
     if (isYarnMode()) {
@@ -383,7 +378,6 @@ public class SparkInterpreter extends Interpreter {
     }
 
     Properties intpProperty = getProperty();
-
     for (Object k : intpProperty.keySet()) {
       String key = (String) k;
       String val = toString(intpProperty.get(key));
@@ -436,7 +430,11 @@ public class SparkInterpreter extends Interpreter {
   }
 
   public SparkContext createSparkContext_1() {
-    logger.info("------ Create new SparkContext {} -------", 
getProperty("master"));
+    // use local mode for embedded spark mode when spark.master is not found
+    if (!conf.contains("spark.master")) {
+      conf.setMaster("local");
+    }
+    logger.info("------ Create new SparkContext {} -------", 
conf.get("spark.master"));
 
     String execUri = System.getenv("SPARK_EXECUTOR_URI");
     String[] jars = null;
@@ -490,9 +488,6 @@ public class SparkInterpreter extends Interpreter {
       classServerUri = (String) Utils.invokeMethod(classServer, "uri");
     }
 
-    conf.setMaster(getProperty("master"))
-        .setAppName(getProperty("spark.app.name"));
-
     if (classServerUri != null) {
       conf.set("spark.repl.class.uri", classServerUri);
     }
@@ -508,13 +503,9 @@ public class SparkInterpreter extends Interpreter {
     if (execUri != null) {
       conf.set("spark.executor.uri", execUri);
     }
-    if (System.getenv("SPARK_HOME") != null) {
-      conf.setSparkHome(System.getenv("SPARK_HOME"));
-    }
     conf.set("spark.scheduler.mode", "FAIR");
 
     Properties intpProperty = getProperty();
-
     for (Object k : intpProperty.keySet()) {
       String key = (String) k;
       String val = toString(intpProperty.get(key));

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b9c46dc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index 9cea693..847153e 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote;
 
 import java.util.*;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.thrift.TException;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
@@ -140,15 +141,38 @@ public class RemoteInterpreter extends Interpreter {
   }
 
   private Map<String, String> getEnvFromInterpreterProperty(Properties 
property) {
-    Map<String, String> env = new HashMap<>();
-    for (Object key : property.keySet()) {
-      if (RemoteInterpreterUtils.isEnvString((String) key)) {
-        env.put((String) key, property.getProperty((String) key));
+    Map<String, String> env = new HashMap<String, String>();
+    StringBuilder sparkConfBuilder = new StringBuilder();
+    for (String key : property.stringPropertyNames()) {
+      if (RemoteInterpreterUtils.isEnvString(key)) {
+        env.put(key, property.getProperty(key));
+      }
+      if (key.equals("master")) {
+        sparkConfBuilder.append(" --master " + property.getProperty("master"));
+      }
+      if (isSparkConf(key, property.getProperty(key))) {
+        sparkConfBuilder.append(" --conf " + key + "=\"" +
+            toShellFormat(property.getProperty(key)) + "\"");
       }
     }
+    env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
     return env;
   }
 
+  private String toShellFormat(String value) {
+    if (value.contains("\'") && value.contains("\"")) {
+      throw new RuntimeException("Spark property value could not contain both 
\" and '");
+    } else if (value.contains("\'")) {
+      return "\"" + value + "\"";
+    } else {
+      return "\'" + value + "\'";
+    }
+  }
+
+  static boolean isSparkConf(String key, String value) {
+    return !StringUtils.isEmpty(key) && key.startsWith("spark.") && 
!StringUtils.isEmpty(value);
+  }
+
   @Override
   public String getClassName() {
     return className;
@@ -559,10 +583,6 @@ public class RemoteInterpreter extends Interpreter {
     return env;
   }
 
-  public void setEnv(Map<String, String> env) {
-    this.env = env;
-  }
-
   public void addEnv(Map<String, String> env) {
     if (this.env == null) {
       this.env = new HashMap<>();

Reply via email to