Repository: oozie Updated Branches: refs/heads/master 5f1444496 -> 299370b44
OOZIE-3112 SparkConfigrationService overwrites properties provided via --properties-file option in SparkAction (gezapeti) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/299370b4 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/299370b4 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/299370b4 Branch: refs/heads/master Commit: 299370b44e2d7b22bfe622d19f65b02c5b18282f Parents: 5f14444 Author: Gezapeti Cseh <gezap...@apache.org> Authored: Fri Dec 1 12:10:11 2017 +0100 Committer: Gezapeti Cseh <gezap...@apache.org> Committed: Fri Dec 1 12:10:11 2017 +0100 ---------------------------------------------------------------------- .../action/hadoop/SparkActionExecutor.java | 22 ++-- .../site/twiki/DG_SparkActionExtension.twiki | 18 +++- release-log.txt | 1 + sharelib/spark/pom.xml | 4 + .../oozie/action/hadoop/SparkArgsExtractor.java | 100 +++++++++++++++++-- .../action/hadoop/TestSparkActionExecutor.java | 25 +++-- .../action/hadoop/TestSparkArgsExtractor.java | 81 +++++++++++++++ 7 files changed, 223 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java index 2338ad2..5f399c4 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -18,6 +18,7 @@ package org.apache.oozie.action.hadoop; +import com.google.common.base.Strings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.oozie.action.ActionExecutorException; @@ -27,6 +28,8 @@ import org.apache.oozie.service.SparkConfigurationService; import org.jdom.Element; import org.jdom.Namespace; +import java.io.IOException; +import java.io.StringWriter; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -37,6 +40,7 @@ public class SparkActionExecutor extends JavaActionExecutor { public static final String SPARK_MASTER = "oozie.spark.master"; public static final String SPARK_MODE = "oozie.spark.mode"; public static final String SPARK_OPTS = "oozie.spark.spark-opts"; + public static final String SPARK_DEFAULT_OPTS = "oozie.spark.spark-default-opts"; public static final String SPARK_JOB_NAME = "oozie.spark.name"; public static final String SPARK_CLASS = "oozie.spark.class"; public static final String SPARK_JAR = "oozie.spark.jar"; @@ -72,22 +76,22 @@ public class SparkActionExecutor extends JavaActionExecutor { String jarLocation = actionXml.getChildTextTrim("jar", ns); actionConf.set(SPARK_JAR, jarLocation); - StringBuilder sparkOptsSb = new StringBuilder(); if (master.startsWith("yarn")) { String resourceManager = actionConf.get(HADOOP_YARN_RM); Properties sparkConfig = Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager); - for (String property : sparkConfig.stringPropertyNames()) { - sparkOptsSb.append("--conf ") - .append(property).append("=").append(sparkConfig.getProperty(property)).append(" "); + if (!sparkConfig.isEmpty()) { + try (final StringWriter sw = new StringWriter()) { + sparkConfig.store(sw, "Generated by Oozie server SparkActionExecutor"); + actionConf.set(SPARK_DEFAULT_OPTS, sw.toString()); + } catch (IOException e) { + LOG.warn("Could not propagate Spark default configuration!", e); + } } } String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns); - if (sparkOpts != null) { - sparkOptsSb.append(sparkOpts); - } - if (sparkOptsSb.length() > 0) { - actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim()); + if (!Strings.isNullOrEmpty(sparkOpts)) { + actionConf.set(SPARK_OPTS, sparkOpts.toString().trim()); } // Setting if SparkMain should setup hadoop config *-site.xml http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki index 45b60b8..294e7dd 100644 --- a/docs/src/site/twiki/DG_SparkActionExtension.twiki +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -97,16 +97,26 @@ The =class= element if present, indicates the spark's application main class. The =jar= element indicates a comma separated list of jars or python files. -The =spark-opts= element, if present, contains a list of spark options that can be passed to spark driver. Spark configuration -options can be passed by specifying '--conf key=value' here, or from =oozie.service.SparkConfigurationService.spark.configurations= -in oozie-site.xml. Values containing whitespaces can be enclosed by double quotes. The =spark-opts= configs have priority. +The =spark-opts= element, if present, contains a list of Spark options that can be passed to Spark. Spark configuration +options can be passed by specifying '--conf key=value' or other Spark CLI options. +Values containing whitespaces can be enclosed by double quotes. Some examples of the =spark-opts= element: * '--conf key=value' * '--conf key1=value1 value2' * '--conf key1="value1 value2"' * '--conf key1=value1 key2="value2 value3"' - * '--conf key=value --verbose' + * '--conf key=value --verbose --properties-file user.properties' + +There are several ways to define properties that will be passed to Spark. They are processed in the following order: + * propagated from =oozie.service.SparkConfigurationService.spark.configurations= + * read from a localized =spark-defaults.conf= file + * read from a file defined in =spark-opts= via the =--properties-file= + * properties defined in =spark-opts= element + +(The latter takes precedence over the former.) +The server propagated properties, the =spark-defaults.conf= and the user-defined properties file are merged together into a +single properties file as Spark handles only one file in its =--properties-file= option. The =arg= element if present, contains arguments that can be passed to spark application. http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 9a3f7b5..d10bdde 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-3112 SparkConfigrationService overwrites properties provided via --properties-file option in SparkAction (gezapeti) OOZIE-3126 Add option to allow list of users to access system config (satishsaley) OOZIE-2900 Retrieve tokens for oozie.launcher.mapreduce.job.hdfs-servers before submission (asasvari) OOZIE-3132 Instrument SLACalculatorMemory (andras.piros) http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/sharelib/spark/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml index 65f641a..e9174c2 100644 --- a/sharelib/spark/pom.xml +++ b/sharelib/spark/pom.xml @@ -355,6 +355,10 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>com.google.code.findbugs</groupId> + <artifactId>annotations</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java index 052950f..2f3cfbe 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java @@ -19,6 +19,7 @@ package org.apache.oozie.action.hadoop; import com.google.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.commons.lang.StringUtils; import org.apache.directory.api.util.Strings; import org.apache.hadoop.conf.Configuration; @@ -26,16 +27,28 @@ import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.StringReader; +import java.io.Writer; import java.net.URI; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.regex.Pattern; +import static org.apache.oozie.action.hadoop.SparkActionExecutor.SPARK_DEFAULT_OPTS; + +@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "Properties file should be specified by user") class SparkArgsExtractor { private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf"); private static final String FILES_OPTION = "--files"; @@ -71,6 +84,7 @@ class SparkArgsExtractor { private static final String DEPLOY_MODE_CLIENT = "client"; private static final String SPARK_YARN_TAGS = "spark.yarn.tags"; private static final String OPT_PROPERTIES_FILE = "--properties-file"; + public static final String SPARK_DEFAULTS_GENERATED_PROPERTIES = "spark-defaults-oozie-generated.properties"; private boolean pySpark = false; private final Configuration actionConf; @@ -135,6 +149,7 @@ class SparkArgsExtractor { final StringBuilder userFiles = new StringBuilder(); final StringBuilder userArchives = new StringBuilder(); final String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); + String propertiesFile = null; if (StringUtils.isNotEmpty(sparkOpts)) { final List<String> sparkOptions = SparkOptionsSplitter.splitSparkOpts(sparkOpts); for (int i = 0; i < sparkOptions.size(); i++) { @@ -177,7 +192,11 @@ class SparkArgsExtractor { if (opt.startsWith(SECURITY_CREDENTIALS_HBASE)) { addedSecurityCredentialsHBase = true; } - + if (opt.startsWith(OPT_PROPERTIES_FILE)){ + i++; + propertiesFile = sparkOptions.get(i); + addToSparkArgs = false; + } if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) { if (!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) { opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS; @@ -283,11 +302,7 @@ class SparkArgsExtractor { sparkArgs.add(CONF_OPTION); sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS); } - final File defaultConfFile = SparkMain.getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN); - if (defaultConfFile != null) { - sparkArgs.add(OPT_PROPERTIES_FILE); - sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString()); - } + mergeAndAddPropertiesFile(sparkArgs, propertiesFile); if ((yarnClusterMode || yarnClientMode)) { final Map<String, URI> fixedFileUrisMap = @@ -326,6 +341,79 @@ class SparkArgsExtractor { return sparkArgs; } + private void mergeAndAddPropertiesFile(final List<String> sparkArgs, final String userDefinedPropertiesFile) + throws IOException { + final Properties properties = new Properties(); + loadServerDefaultProperties(properties); + loadLocalizedDefaultPropertiesFile(properties); + loadUserDefinedPropertiesFile(userDefinedPropertiesFile, properties); + final boolean persisted = persistMergedProperties(properties); + if (persisted) { + sparkArgs.add(OPT_PROPERTIES_FILE); + sparkArgs.add(SPARK_DEFAULTS_GENERATED_PROPERTIES); + } + } + + private boolean persistMergedProperties(final Properties properties) throws IOException { + if (!properties.isEmpty()) { + try (final Writer writer = new OutputStreamWriter( + new FileOutputStream(new File(SPARK_DEFAULTS_GENERATED_PROPERTIES)), + StandardCharsets.UTF_8.name())) { + properties.store(writer, "Properties file generated by Oozie"); + System.out.println(String.format("Persisted merged Spark configs in file %s. Merged properties are: %s", + SPARK_DEFAULTS_GENERATED_PROPERTIES, Arrays.toString(properties.stringPropertyNames().toArray()))); + return true; + } catch (IOException e) { + System.err.println(String.format("Could not persist derived Spark config file. Reason: %s", e.getMessage())); + throw e; + } + } + return false; + } + + private void loadUserDefinedPropertiesFile(final String userDefinedPropertiesFile, final Properties properties) { + if (userDefinedPropertiesFile != null) { + System.out.println(String.format("Reading Spark config from %s %s...", OPT_PROPERTIES_FILE, userDefinedPropertiesFile)); + loadProperties(new File(userDefinedPropertiesFile), properties); + } + } + + private void loadLocalizedDefaultPropertiesFile(final Properties properties) { + final File localizedDefaultConfFile = SparkMain.getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN); + if (localizedDefaultConfFile != null) { + System.out.println(String.format("Reading Spark config from file %s...", localizedDefaultConfFile.getName())); + loadProperties(localizedDefaultConfFile, properties); + } + } + + private void loadServerDefaultProperties(final Properties properties) { + final String sparkDefaultsFromServer = actionConf.get(SPARK_DEFAULT_OPTS, ""); + if (!sparkDefaultsFromServer.isEmpty()) { + System.out.println("Reading Spark config propagated from Oozie server..."); + try (final StringReader reader = new StringReader(sparkDefaultsFromServer)) { + properties.load(reader); + } catch (IOException e) { + System.err.println(String.format("Could not read propagated Spark config! Reason: %s", e.getMessage())); + } + } + } + + private void loadProperties(final File file, final Properties target) { + try (final Reader reader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8.name())) { + final Properties properties = new Properties(); + properties.load(reader); + for(String key :properties.stringPropertyNames()) { + Object prevProperty = target.setProperty(key, properties.getProperty(key)); + if(prevProperty != null){ + System.out.println(String.format("Value of %s was overwritten from %s", key, file.getName())); + } + } + } catch (IOException e) { + System.err.println(String.format("Could not read Spark configs from file %s. Reason: %s", file.getName(), + e.getMessage())); + } + } + private void appendWithPathSeparator(final String what, final StringBuilder to) { if (to.length() > 0) { to.append(File.pathSeparator); http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java index d97f1f0..bf06d14 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -45,6 +45,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; +import java.io.StringReader; import java.io.Writer; import java.text.MessageFormat; @@ -82,12 +83,8 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { Properties sparkConfProps = new Properties(); sparkConfProps.setProperty("a", "A"); sparkConfProps.setProperty("b", "B"); - FileOutputStream fos = null; - try { - fos = new FileOutputStream(sparkConf); + try (FileOutputStream fos = new FileOutputStream(sparkConf)){ sparkConfProps.store(fos, ""); - } finally { - IOUtils.closeSafely(fos); } SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class); scs.destroy(); @@ -104,7 +101,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { } @SuppressWarnings("unchecked") - private void _testSetupMethods(String master, Map<String, String> extraSparkOpts, String mode) throws Exception { + private void _testSetupMethods(String master, Map<String, String> defaultSparkOpts, String mode) throws Exception { SparkActionExecutor ae = new SparkActionExecutor(); assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses()); @@ -137,16 +134,26 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { assertEquals(getNameNodeUri() + "/foo.jar", conf.get("oozie.spark.jar")); Map<String, String> sparkOpts = new HashMap<String, String>(); sparkOpts.put("foo", "bar"); - sparkOpts.putAll(extraSparkOpts); - Matcher m = SPARK_OPTS_PATTERN.matcher(conf.get("oozie.spark.spark-opts")); + sparkOpts.putAll(defaultSparkOpts); + final String configSparkOpts = conf.get("oozie.spark.spark-opts"); int count = 0; + Matcher m = SPARK_OPTS_PATTERN.matcher(configSparkOpts); while (m.find()) { count++; String key = m.group(1); String val = m.group(2); assertEquals(sparkOpts.get(key), val); } - assertEquals(sparkOpts.size(), count); + if(defaultSparkOpts.size() > 0) { + final String defaultCconfigSparkOpts = conf.get("oozie.spark.spark-default-opts"); + Properties p = new Properties(); + p.load(new StringReader(defaultCconfigSparkOpts)); + for(String key : defaultSparkOpts.keySet()){ + count++; + assertEquals(sparkOpts.get(key), p.getProperty(key)); + } + } + assertEquals(configSparkOpts, sparkOpts.size(), count); } private String getActionXml() { http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java index 574bf24..6a9baa5 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java @@ -20,11 +20,19 @@ package org.apache.oozie.action.hadoop; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; +import org.junit.After; import org.junit.Test; +import scala.util.PropertiesTrait; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import static org.apache.oozie.action.hadoop.SparkArgsExtractor.SPARK_DEFAULTS_GENERATED_PROPERTIES; import static org.junit.Assert.assertEquals; public class TestSparkArgsExtractor { @@ -251,4 +259,77 @@ public class TestSparkArgsExtractor { "arg1"), sparkArgs); } + + @Test + public void testPropertiesFileMerging() throws Exception { + final Configuration actionConf = new Configuration(); + actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn"); + actionConf.set(SparkActionExecutor.SPARK_MODE, "client"); + actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); + actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); + actionConf.set(SparkActionExecutor.SPARK_DEFAULT_OPTS, "defaultProperty=1\ndefaultProperty2=2\ndefaultProperty3=3"); + actionConf.set(SparkActionExecutor.SPARK_OPTS, + "--properties-file foo.properties --conf spark.driver.extraJavaOptions=-Xmx234m"); + actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar"); + createTemporaryFileWithContent("spark-defaults.conf", "foo2=bar2\ndefaultProperty3=44\nfoo3=nobar");; + createTemporaryFileWithContent("foo.properties", "foo=bar\ndefaultProperty2=4\nfoo3=barbar"); + + final String[] mainArgs = {"arg0", "arg1"}; + final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs); + + Properties p = readMergedProperties(); + assertEquals("property defaultProperty should've been read from server-propagated config", + "1", p.get("defaultProperty")); + assertEquals("property defaultProperty2 should've been overwritten by user-defined foo.properties", + "4", p.get("defaultProperty2")); + assertEquals("property defaultProperty3 should've been overwritten by localized spark-defaults.conf", + "44", p.get("defaultProperty3")); + assertEquals("property foo should've been read from user-defined foo.properties", + "bar", p.get("foo")); + assertEquals("property foo2 should've been read from localized spark-defaults.conf", + "bar2", p.get("foo2")); + assertEquals("property foo3 should've been overwritten by user-defined foo.properties", + "barbar", p.get("foo3")); + assertEquals("Spark args mismatch", + Lists.newArrayList("--master", "yarn", "--deploy-mode", "client", "--name", "Spark Copy File", + "--class", "org.apache.oozie.example.SparkFileCopy", "--conf", + "spark.driver.extraJavaOptions=-Xmx234m -Dlog4j.configuration=spark-log4j.properties", "--conf", + "spark.executor.extraClassPath=$PWD/*", "--conf", "spark.driver.extraClassPath=$PWD/*", "--conf", + "spark.yarn.security.tokens.hadoopfs.enabled=false", "--conf", + "spark.yarn.security.tokens.hive.enabled=false", "--conf", "spark.yarn.security.tokens.hbase.enabled=false", + "--conf", "spark.yarn.security.credentials.hadoopfs.enabled=false", "--conf", + "spark.yarn.security.credentials.hive.enabled=false", "--conf", + "spark.yarn.security.credentials.hbase.enabled=false", "--conf", + "spark.executor.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties", + "--properties-file", "spark-defaults-oozie-generated.properties", "--files", + "spark-log4j.properties,hive-site.xml", "--conf", "spark.yarn.jar=null", "--verbose", "/lib/test.jar", + "arg0", "arg1"), + sparkArgs); + } + + private Properties readMergedProperties() throws IOException { + final File file = new File(SPARK_DEFAULTS_GENERATED_PROPERTIES); + file.deleteOnExit(); + final Properties properties = new Properties(); + try(final FileReader reader = new FileReader(file)) { + properties.load(reader); + } + return properties; + } + + private void createTemporaryFileWithContent(String filename, String content) throws IOException { + final File file = new File(filename); + file.deleteOnExit(); + try(final FileWriter fileWriter = new FileWriter(file)) { + fileWriter.write(content); + } + } + + @After + public void cleanUp() throws Exception { + File f = new File("spark-defaults.conf"); + if(f.exists()) { + f.delete(); + } + } } \ No newline at end of file