Repository: zeppelin Updated Branches: refs/heads/master e1f20ddda -> 1b9c46dcf
ZEPPELIN-1263. Should specify zeppelin's spark configuration through --conf arguments of spark-submit ### What is this PR for? For now we spark configuration at runtime rather than pass them through `--conf`, it would cause several issues. - Some configuration has to be set through --conf, otherwise we need to duplicate code in SparkSubmit.scala (spark.yarn.keytab, spark.yarn.principal) - Some configuration would conflict with spark-defaults.conf. If you specify spark.master as yarn-client in spark-defaults.conf but specify spark.master as local in zeppelin side, you will see the spark interpreter fail to start due to this inconsistency. - As ZEPPELIN-1460 described, it is hard to figure what is the effective configuration. - We can not use yarn-cluster mode although it is not supported now, but I think it is necessary to do that as zeppelin needs to support multiple users. This PR would pass all the spark related configuration to spark-submit through `--conf`, so that it is easy to know and guarantee that configuration on zeppelin interpreter setting take precedence over spark-defaults.conf. And it is also good for maintenance that upstream change (any change about configuration in spark) would not affect us. ### What type of PR is it? [Improvement] ### Todos - [ ] - Task ### What is the Jira issue? - https://issues.apache.org/jira/browse/ZEPPELIN-1263 ### How should this be tested? Tested spark 1.6 spark 2.0 on both yarn-client mode and embedded mode. ### Screenshots (if appropriate) ![image](https://cloud.githubusercontent.com/assets/164491/18702212/3e7b54d0-8013-11e6-95f7-502b3cf89d67.png) ### Questions: - Does the licenses files need update? No - Is there breaking changes for older versions? No - Does this needs documentation? No ⦠Author: Jeff Zhang <zjf...@apache.org> Closes #1446 from zjffdu/ZEPPELIN-1263 and squashes the following commits: f57d7bb [Jeff Zhang] address comments 151e991 [Jeff Zhang] ZEPPELIN-1263. Should specify zeppelin's spark configuration through --conf arguments of spark-submit Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/1b9c46dc Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/1b9c46dc Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/1b9c46dc Branch: refs/heads/master Commit: 1b9c46dcfd2bec6e0c1a2d4f1aa41d2834d23c77 Parents: e1f20dd Author: Jeff Zhang <zjf...@apache.org> Authored: Mon Jul 17 13:26:15 2017 +0800 Committer: Jeff Zhang <zjf...@apache.org> Committed: Sat Jul 22 20:30:44 2017 +0800 ---------------------------------------------------------------------- bin/interpreter.sh | 4 +-- .../apache/zeppelin/spark/SparkInterpreter.java | 25 +++++--------- .../interpreter/remote/RemoteInterpreter.java | 36 +++++++++++++++----- 3 files changed, 38 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b9c46dc/bin/interpreter.sh ---------------------------------------------------------------------- diff --git a/bin/interpreter.sh b/bin/interpreter.sh index 29d0221..1344e31 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -202,9 +202,9 @@ fi if [[ -n "${SPARK_SUBMIT}" ]]; then if [[ -n "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ "$ZEPPELIN_IMPERSONATE_SPARK_PROXY_USER" != "false" ]]; then - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}` + INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} --proxy-user ${ZEPPELIN_IMPERSONATE_USER} ${SPARK_APP_JAR} ${PORT}` else - INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${SPARK_APP_JAR} ${PORT}` + INTERPRETER_RUN_COMMAND+=' '` echo ${SPARK_SUBMIT} --class ${ZEPPELIN_SERVER} --driver-class-path \"${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH}\" --driver-java-options \"${JAVA_INTP_OPTS}\" ${SPARK_SUBMIT_OPTIONS} ${ZEPPELIN_SPARK_CONF} ${SPARK_APP_JAR} ${PORT}` fi else INTERPRETER_RUN_COMMAND+=' '` echo ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} ${ZEPPELIN_INTP_MEM} -cp ${ZEPPELIN_INTP_CLASSPATH_OVERRIDES}:${ZEPPELIN_INTP_CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} ` http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b9c46dc/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index dd1fa11..df41014 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -359,10 +359,10 @@ public class SparkInterpreter extends Interpreter { * Create SparkSession */ public Object createSparkSession() { - logger.info("------ Create new SparkContext {} -------", getProperty("master")); + // use local mode for embedded spark mode when spark.master is not found + conf.setIfMissing("spark.master", "local"); + logger.info("------ Create new SparkSession {} -------", conf.get("spark.master")); String execUri = System.getenv("SPARK_EXECUTOR_URI"); - conf.setAppName(getProperty("spark.app.name")); - if (outputDir != null) { conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath()); } @@ -370,11 +370,6 @@ public class SparkInterpreter extends Interpreter { if (execUri != null) { conf.set("spark.executor.uri", execUri); } - - if (System.getenv("SPARK_HOME") != null) { - conf.setSparkHome(System.getenv("SPARK_HOME")); - } - conf.set("spark.scheduler.mode", "FAIR"); conf.setMaster(getProperty("master")); if (isYarnMode()) { @@ -383,7 +378,6 @@ public class SparkInterpreter extends Interpreter { } Properties intpProperty = getProperty(); - for (Object k : intpProperty.keySet()) { String key = (String) k; String val = toString(intpProperty.get(key)); @@ -436,7 +430,11 @@ public class SparkInterpreter extends Interpreter { } public SparkContext createSparkContext_1() { - logger.info("------ Create new SparkContext {} -------", getProperty("master")); + // use local mode for embedded spark mode when spark.master is not found + if (!conf.contains("spark.master")) { + conf.setMaster("local"); + } + logger.info("------ Create new SparkContext {} -------", conf.get("spark.master")); String execUri = System.getenv("SPARK_EXECUTOR_URI"); String[] jars = null; @@ -490,9 +488,6 @@ public class SparkInterpreter extends Interpreter { classServerUri = (String) Utils.invokeMethod(classServer, "uri"); } - conf.setMaster(getProperty("master")) - .setAppName(getProperty("spark.app.name")); - if (classServerUri != null) { conf.set("spark.repl.class.uri", classServerUri); } @@ -508,13 +503,9 @@ public class SparkInterpreter extends Interpreter { if (execUri != null) { conf.set("spark.executor.uri", execUri); } - if (System.getenv("SPARK_HOME") != null) { - conf.setSparkHome(System.getenv("SPARK_HOME")); - } conf.set("spark.scheduler.mode", "FAIR"); Properties intpProperty = getProperty(); - for (Object k : intpProperty.keySet()) { String key = (String) k; String val = toString(intpProperty.get(key)); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/1b9c46dc/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 9cea693..847153e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.remote; import java.util.*; +import org.apache.commons.lang3.StringUtils; import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; @@ -140,15 +141,38 @@ public class RemoteInterpreter extends Interpreter { } private Map<String, String> getEnvFromInterpreterProperty(Properties property) { - Map<String, String> env = new HashMap<>(); - for (Object key : property.keySet()) { - if (RemoteInterpreterUtils.isEnvString((String) key)) { - env.put((String) key, property.getProperty((String) key)); + Map<String, String> env = new HashMap<String, String>(); + StringBuilder sparkConfBuilder = new StringBuilder(); + for (String key : property.stringPropertyNames()) { + if (RemoteInterpreterUtils.isEnvString(key)) { + env.put(key, property.getProperty(key)); + } + if (key.equals("master")) { + sparkConfBuilder.append(" --master " + property.getProperty("master")); + } + if (isSparkConf(key, property.getProperty(key))) { + sparkConfBuilder.append(" --conf " + key + "=\"" + + toShellFormat(property.getProperty(key)) + "\""); } } + env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); return env; } + private String toShellFormat(String value) { + if (value.contains("\'") && value.contains("\"")) { + throw new RuntimeException("Spark property value could not contain both \" and '"); + } else if (value.contains("\'")) { + return "\"" + value + "\""; + } else { + return "\'" + value + "\'"; + } + } + + static boolean isSparkConf(String key, String value) { + return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value); + } + @Override public String getClassName() { return className; @@ -559,10 +583,6 @@ public class RemoteInterpreter extends Interpreter { return env; } - public void setEnv(Map<String, String> env) { - this.env = env; - } - public void addEnv(Map<String, String> env) { if (this.env == null) { this.env = new HashMap<>();