Repository: oozie Updated Branches: refs/heads/master 438ba6df7 -> 5b21530aa
OOZIE-3228 [Spark action] Can't load properties from spark-defaults.conf (andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5b21530a Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5b21530a Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5b21530a Branch: refs/heads/master Commit: 5b21530aaee04f7f3af8e43af7f9ccd01935f589 Parents: 438ba6d Author: Andras Piros <andras.pi...@cloudera.com> Authored: Wed May 16 16:19:36 2018 +0200 Committer: Andras Piros <andras.pi...@cloudera.com> Committed: Wed May 16 16:19:36 2018 +0200 ---------------------------------------------------------------------- .../site/twiki/DG_SparkActionExtension.twiki | 17 ++++-- release-log.txt | 1 + .../oozie/action/hadoop/SparkArgsExtractor.java | 58 +++++++++++++++++++- .../action/hadoop/TestSparkArgsExtractor.java | 53 +++++++++++++++++- 4 files changed, 121 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/5b21530a/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki index 66076ff..ce80e45 100644 --- a/docs/src/site/twiki/DG_SparkActionExtension.twiki +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -109,10 +109,10 @@ Some examples of the =spark-opts= element: * '--conf key=value --verbose --properties-file user.properties' There are several ways to define properties that will be passed to Spark. They are processed in the following order: - * propagated from =oozie.service.SparkConfigurationService.spark.configurations= - * read from a localized =spark-defaults.conf= file - * read from a file defined in =spark-opts= via the =--properties-file= - * properties defined in =spark-opts= element + * propagated from =oozie.service.SparkConfigurationService.spark.configurations= + * read from a localized =spark-defaults.conf= file + * read from a file defined in =spark-opts= via the =--properties-file= + * properties defined in =spark-opts= element (The latter takes precedence over the former.) The server propagated properties, the =spark-defaults.conf= and the user-defined properties file are merged together into a @@ -120,6 +120,15 @@ single properties file as Spark handles only one file in its =--properties-file= The =arg= element if present, contains arguments that can be passed to spark application. +In case some property values are present both in =spark-defaults.conf= and as property key/value pairs generated by Oozie, the user +configured values from =spark-defaults.conf= are prepended to the ones generated by Oozie, as part of the Spark arguments list. + +Following properties to prepend to Spark arguments: + * =spark.executor.extraClassPath= + * =spark.driver.extraClassPath= + * =spark.executor.extraJavaOptions= + * =spark.driver.extraJavaOptions= + All the above elements can be parameterized (templatized) using EL expressions. http://git-wip-us.apache.org/repos/asf/oozie/blob/5b21530a/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 267af2a..a89b714 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3228 [Spark action] Can't load properties from spark-defaults.conf (andras.piros) OOZIE-3250 Reduce heap waste by reducing duplicate byte[] count (andras.piros) OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti) OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti) http://git-wip-us.apache.org/repos/asf/oozie/blob/5b21530a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java index 5af39cd..28d9c5c 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java @@ -42,6 +42,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.ListIterator; import java.util.Map; import java.util.Properties; import java.util.regex.Pattern; @@ -76,6 +77,8 @@ class SparkArgsExtractor { private static final String SPARK_YARN_JARS = "spark.yarn.jars"; private static final String OPT_SEPARATOR = "="; private static final String OPT_VALUE_SEPARATOR = ","; + private static final String SPARK_OPT_SEPARATOR = ":"; + private static final String JAVA_OPT_SEPARATOR = " "; private static final String CONF_OPTION = "--conf"; private static final String MASTER_OPTION_YARN_CLUSTER = "yarn-cluster"; private static final String MASTER_OPTION_YARN_CLIENT = "yarn-client"; @@ -84,7 +87,7 @@ class SparkArgsExtractor { private static final String DEPLOY_MODE_CLIENT = "client"; private static final String SPARK_YARN_TAGS = "spark.yarn.tags"; private static final String OPT_PROPERTIES_FILE = "--properties-file"; - public static final String SPARK_DEFAULTS_GENERATED_PROPERTIES = "spark-defaults-oozie-generated.properties"; + static final String SPARK_DEFAULTS_GENERATED_PROPERTIES = "spark-defaults-oozie-generated.properties"; private boolean pySpark = false; private final Configuration actionConf; @@ -351,6 +354,8 @@ class SparkArgsExtractor { if (persisted) { sparkArgs.add(OPT_PROPERTIES_FILE); sparkArgs.add(SPARK_DEFAULTS_GENERATED_PROPERTIES); + + checkPropertiesAndPrependArgs(properties, sparkArgs); } } @@ -371,6 +376,57 @@ class SparkArgsExtractor { return false; } + /** + * In case some property values are present both in {@code spark-defaults.conf} and as property key/value pairs generated by + * Oozie, prepend the user configured values from {@code spark-defaults.conf} to the ones generated by Oozie, as part of the + * Spark arguments list. Users don't want to lose the configured values, and we don't want to lose the generated ones, either. + * <p> + * Following properties to prepend to Spark arguments: + * <ul> + * <li>{@code spark.executor.extraClassPath}</li> + * <li>{@code spark.driver.extraClassPath}</li> + * <li>{@code spark.executor.extraJavaOptions}</li> + * <li>{@code spark.driver.extraJavaOptions}</li> + * </ul> + * @param source {@link Properties} defined in {@code spark-defaults.conf} by the user + * @param target Spark options + */ + private void checkPropertiesAndPrependArgs(final Properties source, final List<String> target) { + checkPropertiesAndPrependArg(EXECUTOR_CLASSPATH, SPARK_OPT_SEPARATOR, source, target); + checkPropertiesAndPrependArg(DRIVER_CLASSPATH, SPARK_OPT_SEPARATOR, source, target); + checkPropertiesAndPrependArg(EXECUTOR_EXTRA_JAVA_OPTIONS, JAVA_OPT_SEPARATOR, source, target); + checkPropertiesAndPrependArg(DRIVER_EXTRA_JAVA_OPTIONS, JAVA_OPT_SEPARATOR, source, target); + } + + /** + * Prepend one user defined property value from {@code spark-defaults.properties} to the Oozie generated value, and store to + * Spark options. + * @param key key of the user defined property key/value pair + * @param separator user defined and generated values must be separated, depending on the context + * @param source {@link Properties} defined in {@code spark-defaults.conf} by the user + * @param target Spark options + */ + private void checkPropertiesAndPrependArg(final String key, + final String separator, + final Properties source, + final List<String> target) { + final String propertiesKey = key.replace(OPT_SEPARATOR, ""); + if (source.containsKey(propertiesKey)) { + final ListIterator<String> targetIterator = target.listIterator(); + while (targetIterator.hasNext()) { + final String arg = targetIterator.next(); + if (arg.startsWith(key)) { + final String valueToPrepend = source.getProperty(propertiesKey); + final String oldValue = arg.substring(arg.indexOf(key) + key.length()); + String newValue = valueToPrepend + separator + oldValue; + System.out.println(String.format("Spark argument to replace: [%s=%s]", propertiesKey, oldValue)); + targetIterator.set(key + newValue); + System.out.println(String.format("Spark argument replaced with: [%s=%s]", propertiesKey, newValue)); + } + } + } + } + private void loadUserDefinedPropertiesFile(final String userDefinedPropertiesFile, final Properties properties) { if (userDefinedPropertiesFile != null) { System.out.println(String.format("Reading Spark config from %s %s...", OPT_PROPERTIES_FILE, userDefinedPropertiesFile)); http://git-wip-us.apache.org/repos/asf/oozie/blob/5b21530a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java index 6a9baa5..d75e727 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java @@ -22,12 +22,12 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.junit.After; import org.junit.Test; -import scala.util.PropertiesTrait; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -37,6 +37,8 @@ import static org.junit.Assert.assertEquals; public class TestSparkArgsExtractor { + private static final String SPARK_DEFAULTS_PROPERTIES = "spark-defaults.conf"; + @Test public void testAppendOoziePropertiesToSparkConf() throws Exception { final List<String> sparkArgs = new ArrayList<>(); @@ -307,6 +309,46 @@ public class TestSparkArgsExtractor { sparkArgs); } + @Test + public void testPropertiesArePrependedToSparkArgs() throws IOException, OozieActionConfiguratorException, URISyntaxException { + final Configuration actionConf = new Configuration(); + actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn"); + actionConf.set(SparkActionExecutor.SPARK_MODE, "client"); + actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); + actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); + actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar"); + + createTemporaryFileWithContent(SPARK_DEFAULTS_PROPERTIES, "spark.executor.extraClassPath=/etc/hbase/conf:/etc/hive/conf\n" + + "spark.driver.extraClassPath=/etc/hbase/conf:/etc/hive/conf\n" + + "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGC -XX:+UnlockExperimentalVMOptions\n" + + "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGC -XX:+UnlockExperimentalVMOptions"); + + final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(new String[0]); + + assertEquals("Spark args mismatch", + Lists.newArrayList("--master", "yarn", + "--deploy-mode", "client", + "--name", "Spark Copy File", + "--class", "org.apache.oozie.example.SparkFileCopy", + "--conf", "spark.executor.extraClassPath=/etc/hbase/conf:/etc/hive/conf:$PWD/*", + "--conf", "spark.driver.extraClassPath=/etc/hbase/conf:/etc/hive/conf:$PWD/*", + "--conf", "spark.yarn.security.tokens.hadoopfs.enabled=false", + "--conf", "spark.yarn.security.tokens.hive.enabled=false", + "--conf", "spark.yarn.security.tokens.hbase.enabled=false", + "--conf", "spark.yarn.security.credentials.hadoopfs.enabled=false", + "--conf", "spark.yarn.security.credentials.hive.enabled=false", + "--conf", "spark.yarn.security.credentials.hbase.enabled=false", + "--conf", "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGC -XX:+UnlockExperimentalVMOptions " + + "-Dlog4j.configuration=spark-log4j.properties", + "--conf", "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGC -XX:+UnlockExperimentalVMOptions " + + "-Dlog4j.configuration=spark-log4j.properties", + "--properties-file", "spark-defaults-oozie-generated.properties", + "--files", "spark-log4j.properties,hive-site.xml", + "--conf", "spark.yarn.jar=null", + "--verbose", "/lib/test.jar"), + sparkArgs); + } + private Properties readMergedProperties() throws IOException { final File file = new File(SPARK_DEFAULTS_GENERATED_PROPERTIES); file.deleteOnExit(); @@ -327,8 +369,13 @@ public class TestSparkArgsExtractor { @After public void cleanUp() throws Exception { - File f = new File("spark-defaults.conf"); - if(f.exists()) { + checkAndDeleteFile(SPARK_DEFAULTS_GENERATED_PROPERTIES); + checkAndDeleteFile(SPARK_DEFAULTS_PROPERTIES); + } + + private void checkAndDeleteFile(final String filename) { + final File f = new File(filename); + if (f.exists()) { f.delete(); } }