Repository: oozie Updated Branches: refs/heads/master e8bd9fc92 -> 98ad14bdc
OOZIE-2811 Add support for filtering out properties from SparkConfigurationService (gezapeti via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/98ad14bd Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/98ad14bd Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/98ad14bd Branch: refs/heads/master Commit: 98ad14bdc021132bd4b0ad54e8edc9f71e0e2636 Parents: e8bd9fc Author: Robert Kanter <rkan...@apache.org> Authored: Tue Mar 7 15:18:14 2017 -0800 Committer: Robert Kanter <rkan...@apache.org> Committed: Tue Mar 7 15:18:14 2017 -0800 ---------------------------------------------------------------------- .../action/hadoop/SparkActionExecutor.java | 10 +- .../service/SparkConfigurationService.java | 144 +++++++++++-------- core/src/main/resources/oozie-default.xml | 18 ++- .../service/TestSparkConfigurationService.java | 130 +++++++++-------- release-log.txt | 1 + 5 files changed, 176 insertions(+), 127 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/98ad14bd/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java index e2e023e..1a3197a 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -32,7 +32,7 @@ import org.jdom.Namespace; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Properties; public class SparkActionExecutor extends JavaActionExecutor { public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain"; @@ -79,9 +79,11 @@ public class SparkActionExecutor extends JavaActionExecutor { StringBuilder sparkOptsSb = new StringBuilder(); if (master.startsWith("yarn")) { String resourceManager = actionConf.get(HADOOP_JOB_TRACKER); - Map<String, String> sparkConfig = Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager); - for (Map.Entry<String, String> entry : sparkConfig.entrySet()) { - sparkOptsSb.append("--conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" "); + Properties sparkConfig = + Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager); + for (String property : sparkConfig.stringPropertyNames()) { + sparkOptsSb.append("--conf ") + .append(property).append("=").append(sparkConfig.getProperty(property)).append(" "); } } String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns); http://git-wip-us.apache.org/repos/asf/oozie/blob/98ad14bd/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java index b29ab8d..b15cce0 100644 --- a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java +++ b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java @@ -18,19 +18,19 @@ package org.apache.oozie.service; -import org.apache.hadoop.conf.Configuration; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; import java.io.File; +import java.io.FileInputStream; import java.io.FileReader; import java.io.IOException; -import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; public class SparkConfigurationService implements Service { @@ -39,20 +39,25 @@ public class SparkConfigurationService implements Service { public static final String CONF_PREFIX = Service.CONF_PREFIX + "SparkConfigurationService."; public static final String SPARK_CONFIGURATIONS = CONF_PREFIX + "spark.configurations"; public static final String SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR - = CONF_PREFIX + "spark.configurations.ignore.spark.yarn.jar"; + = SPARK_CONFIGURATIONS + ".ignore.spark.yarn.jar"; + public static final String SPARK_CONFIGURATIONS_BLACKLIST = SPARK_CONFIGURATIONS + ".blacklist"; - private Map<String, Map<String, String>> sparkConfigs; private static final String SPARK_CONFIG_FILE = "spark-defaults.conf"; private static final String SPARK_YARN_JAR_PROP = "spark.yarn.jar"; + private static final String HOST_WILDCARD = "*"; + private Map<String, Properties> sparkConfigs; + private Set<String> blacklist; @Override public void init(Services services) throws ServiceException { + loadBlacklist(); loadSparkConfigs(); } @Override public void destroy() { sparkConfigs.clear(); + blacklist.clear(); } @Override @@ -60,76 +65,89 @@ public class SparkConfigurationService implements Service { return SparkConfigurationService.class; } + private void loadBlacklist() { + blacklist = new HashSet<>(); + for(String s : ConfigurationService.getStrings(SPARK_CONFIGURATIONS_BLACKLIST)) { + blacklist.add(s.trim()); + } + // spark.yarn.jar is added if the old property to ignore it is set. + if(ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR)){ + LOG.warn("Deprecated property found in configuration: " + SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR + + "Use "+SPARK_CONFIGURATIONS_BLACKLIST+" instead."); + blacklist.add(SPARK_YARN_JAR_PROP); + } + } + private void loadSparkConfigs() throws ServiceException { - sparkConfigs = new HashMap<String, Map<String, String>>(); - File configDir = new File(ConfigurationService.getConfigurationDirectory()); + sparkConfigs = new HashMap<>(); String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATIONS); - if (confDefs != null) { - boolean ignoreSparkYarnJar = ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR); - for (String confDef : confDefs) { - if (confDef.trim().length() > 0) { - String[] parts = confDef.split("="); - if (parts.length == 2) { - String hostPort = parts[0]; - String confDir = parts[1]; - File dir = new File(confDir); - if (!dir.isAbsolute()) { - dir = new File(configDir, confDir); - } - if (dir.exists()) { - File file = new File(dir, SPARK_CONFIG_FILE); - if (file.exists()) { - Properties props = new Properties(); - FileReader fr = null; - try { - fr = new FileReader(file); - props.load(fr); - fr.close(); - if (ignoreSparkYarnJar) { - // Ignore spark.yarn.jar because it may interfere with the Spark Sharelib jars - props.remove(SPARK_YARN_JAR_PROP); - } - sparkConfigs.put(hostPort, propsToMap(props)); - LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath()); - } catch (IOException ioe) { - LOG.warn("Spark Configuration could not be loaded for {0}: {1}", - hostPort, ioe.getMessage(), ioe); - } finally { - IOUtils.closeSafely(fr); - } - } else { - LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist", - hostPort, file.getAbsolutePath()); - } - } else { - LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist", - hostPort, dir.getAbsolutePath()); - } - } else { - LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef); - } + for (String confDef : confDefs) { + readEntry(confDef.trim()); + } + } + + private void readEntry(String confDef) throws ServiceException { + String[] parts = confDef.split("="); + if (parts.length == 2) { + String hostPort = parts[0]; + String confDir = parts[1]; + File dir = getAbsoluteDir(confDir); + if (dir.exists()) { + Properties sparkDefaults = readSparkConfigFile(hostPort, dir); + filterBlackList(sparkDefaults); + if(!sparkDefaults.isEmpty()) { + sparkConfigs.put(hostPort, sparkDefaults); } + } else { + LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist", + hostPort, dir.getAbsolutePath()); } } else { - LOG.info("Spark Configuration(s) not specified"); + LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef); } } - private Map<String, String> propsToMap(Properties props) { - Map<String, String> map = new HashMap<String, String>(props.size()); - for (String key : props.stringPropertyNames()) { - map.put(key, props.getProperty(key)); + private File getAbsoluteDir(String confDir) throws ServiceException { + File dir = new File(confDir); + if (!dir.isAbsolute()) { + File configDir = new File(ConfigurationService.getConfigurationDirectory()); + dir = new File(configDir, confDir); + } + return dir; + } + + private void filterBlackList(Properties sparkDefaults) { + for(String property : blacklist){ + sparkDefaults.remove(property); + } + } + + private Properties readSparkConfigFile(String hostPort, File dir) { + File file = new File(dir, SPARK_CONFIG_FILE); + Properties props = new Properties(); + if (file.exists()) { + try (FileInputStream stream = new FileInputStream(file); + InputStreamReader reader = new InputStreamReader(stream, StandardCharsets.UTF_8.name())) { + props.load(reader); + LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath()); + } catch (IOException ioe) { + LOG.warn("Spark Configuration could not be loaded for {0}: {1}", + hostPort, ioe.getMessage(), ioe); + } + } else { + LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist", + hostPort, file.getAbsolutePath()); } - return map; + return props; } - public Map<String, String> getSparkConfig(String resourceManagerHostPort) { + public Properties getSparkConfig(String resourceManagerHostPort) { resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null; - Map<String, String> config = sparkConfigs.get(resourceManagerHostPort); + Properties config = sparkConfigs.get(resourceManagerHostPort); if (config == null) { - config = sparkConfigs.get("*"); + config = sparkConfigs.get(HOST_WILDCARD); if (config == null) { - config = new HashMap<String, String>(); + config = new Properties(); } } return config; http://git-wip-us.apache.org/repos/asf/oozie/blob/98ad14bd/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 95e0c36..b481887 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2881,13 +2881,23 @@ will be the requeue interval for the actions which are waiting for a long time w </property> <property> + <name>oozie.service.SparkConfigurationService.spark.configurations.blacklist</name> + <value>spark.yarn.jar,spark.yarn.jars</value> + <description> + Comma separated list of properties to ignore from any Spark configurations specified in + oozie.service.SparkConfigurationService.spark.configurations property. + </description> + </property> + + <property> <name>oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar</name> <value>true</value> <description> - If true, Oozie will ignore the "spark.yarn.jar" property from any Spark configurations specified in - oozie.service.SparkConfigurationService.spark.configurations. If false, Oozie will not ignore it. It is recommended - to leave this as true because it can interfere with the jars in the Spark sharelib. - </description> + Deprecated. Use oozie.service.SparkConfigurationService.spark.configurations.blacklist instead. + If true, Oozie will ignore the "spark.yarn.jar" property from any Spark configurations specified in + oozie.service.SparkConfigurationService.spark.configurations. If false, Oozie will not ignore it. It is recommended + to leave this as true because it can interfere with the jars in the Spark sharelib. + </description> </property> <property> http://git-wip-us.apache.org/repos/asf/oozie/blob/98ad14bd/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java index 9d82fdc..0e00a45 100644 --- a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java +++ b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,21 +18,11 @@ package org.apache.oozie.service; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.oozie.ErrorCode; import org.apache.oozie.test.XTestCase; -import org.apache.oozie.util.IOUtils; import java.io.File; import java.io.FileOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.util.Map; +import java.io.IOException; import java.util.Properties; public class TestSparkConfigurationService extends XTestCase { @@ -53,40 +43,14 @@ public class TestSparkConfigurationService extends XTestCase { scs.destroy(); ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", ""); scs.init(Services.get()); - Map<String, String> sparkConfigs = scs.getSparkConfig("foo"); + Properties sparkConfigs = scs.getSparkConfig("foo"); assertEquals(0, sparkConfigs.size()); } public void testSparkConfigs() throws Exception { - File sparkConf1Dir = new File(getTestCaseConfDir(), "spark-conf-1"); - File sparkConf3Dir = new File(getTestCaseConfDir(), "spark-conf-3"); - File sparkConf4Dir = new File(getTestCaseConfDir(), "spark-conf-4"); - sparkConf1Dir.mkdirs(); - sparkConf3Dir.mkdirs(); - sparkConf4Dir.mkdirs(); - File sparkConf1 = new File(sparkConf1Dir, "spark-defaults.conf"); - Properties sparkConf1Props = new Properties(); - sparkConf1Props.setProperty("a", "A"); - sparkConf1Props.setProperty("b", "B"); - sparkConf1Props.setProperty("spark.yarn.jar", "foo"); // should be ignored by default - FileOutputStream fos = null; - try { - fos = new FileOutputStream(sparkConf1); - sparkConf1Props.store(fos, ""); - } finally { - IOUtils.closeSafely(fos); - } - File sparkConf4 = new File(sparkConf4Dir, "spark-defaults.conf"); - Properties sparkConf4Props = new Properties(); - sparkConf4Props.setProperty("y", "Y"); - sparkConf4Props.setProperty("z", "Z"); - fos = null; - try { - fos = new FileOutputStream(sparkConf4); - sparkConf4Props.store(fos, ""); - } finally { - IOUtils.closeSafely(fos); - } + File sparkConf1Dir = createSparkConfsInDir("spark-conf-1", "a", "A", "b", "B", "spark.yarn.jar", "foo"); + File sparkConf3Dir = createSparkConfsInDir("spark-conf-3"); + File sparkConf4Dir = createSparkConfsInDir("spark-conf-4", "y", "Y", "z", "Z", "spark.yarn.jars", "foo2"); SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class); scs.destroy(); @@ -96,48 +60,102 @@ public class TestSparkConfigurationService extends XTestCase { ",rm3=" + sparkConf3Dir.getAbsolutePath() + // missing file ",rm4=" + sparkConf4Dir.getName()); // relative path scs.init(Services.get()); - Map<String, String> sparkConfigs = scs.getSparkConfig("foo"); - assertEquals(0, sparkConfigs.size()); + Properties sparkConfigs = scs.getSparkConfig("foo"); + assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size()); sparkConfigs = scs.getSparkConfig("rm1"); - assertEquals(2, sparkConfigs.size()); + assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size()); assertEquals("A", sparkConfigs.get("a")); assertEquals("B", sparkConfigs.get("b")); sparkConfigs = scs.getSparkConfig("rm2"); - assertEquals(0, sparkConfigs.size()); + assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size()); sparkConfigs = scs.getSparkConfig("rm3"); - assertEquals(0, sparkConfigs.size()); + assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size()); sparkConfigs = scs.getSparkConfig("rm4"); - assertEquals(2, sparkConfigs.size()); + assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size()); assertEquals("Y", sparkConfigs.get("y")); assertEquals("Z", sparkConfigs.get("z")); scs.destroy(); // Setting this to false should make it not ignore spark.yarn.jar ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar", false); + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations.blacklist", " "); scs.init(Services.get()); sparkConfigs = scs.getSparkConfig("rm1"); - assertEquals(3, sparkConfigs.size()); + assertEquals(sparkConfigs.toString(), 3, sparkConfigs.size()); assertEquals("A", sparkConfigs.get("a")); assertEquals("B", sparkConfigs.get("b")); assertEquals("foo", sparkConfigs.get("spark.yarn.jar")); ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar", true); - + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations.blacklist", "spark.yarn.jar,spark.yarn.jars"); scs.destroy(); ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", - "rm1=" + sparkConf1Dir.getAbsolutePath() + // defined + "rm1=" + sparkConf1Dir.getAbsolutePath() + // define ",*=" + sparkConf4Dir.getAbsolutePath()); // wildcard scs.init(Services.get()); sparkConfigs = scs.getSparkConfig("rm1"); - assertEquals(2, sparkConfigs.size()); + assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size()); assertEquals("A", sparkConfigs.get("a")); assertEquals("B", sparkConfigs.get("b")); sparkConfigs = scs.getSparkConfig("rm2"); - assertEquals(2, sparkConfigs.size()); + assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size()); assertEquals("Y", sparkConfigs.get("y")); assertEquals("Z", sparkConfigs.get("z")); sparkConfigs = scs.getSparkConfig("foo"); - assertEquals(2, sparkConfigs.size()); + assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size()); assertEquals("Y", sparkConfigs.get("y")); assertEquals("Z", sparkConfigs.get("z")); } + + private File createSparkConfsInDir(String directory, String... props) throws IOException { + File sparkConf1Dir = new File(getTestCaseConfDir(), directory); + sparkConf1Dir.mkdirs(); + File sparkConf1 = new File(sparkConf1Dir, "spark-defaults.conf"); + Properties sparkConf1Props = new Properties(); + for (int i = 0; i < props.length; i += 2) { + sparkConf1Props.setProperty(props[i], props[i + 1]); + } + if (!sparkConf1Props.isEmpty()) { + try (FileOutputStream fos = new FileOutputStream(sparkConf1)) { + sparkConf1Props.store(fos, ""); + } + } + return sparkConf1Dir; + } + + public void testBlackList() throws Exception { + File sparkConf1Dir = createSparkConfsInDir("spark-conf-1", "a", "A", "b", "B", + "spark.yarn.jar", "foo"); + File sparkConf3Dir = createSparkConfsInDir("spark-conf-3"); + File sparkConf4Dir = createSparkConfsInDir("spark-conf-4", "y", "Y", "z", "Z", + "spark.yarn.jars", "foo2"); + + SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class); + scs.destroy(); + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", + "rm1=" + sparkConf1Dir.getAbsolutePath() + // absolute path + ",rm2" + // invalid entry + ",rm3=" + sparkConf3Dir.getAbsolutePath() + // missing file + ",rm4=" + sparkConf4Dir.getName()); // relative path + ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar", + false); + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations.blacklist", "a,z"); + scs.init(Services.get()); + Properties sparkConfigs = scs.getSparkConfig("foo"); + assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size()); + sparkConfigs = scs.getSparkConfig("rm1"); + assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size()); + assertEquals("B", sparkConfigs.get("b")); + assertEquals("foo", sparkConfigs.get("spark.yarn.jar")); + sparkConfigs = scs.getSparkConfig("rm2"); + assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size()); + sparkConfigs = scs.getSparkConfig("rm3"); + assertEquals(sparkConfigs.toString(), 0, sparkConfigs.size()); + sparkConfigs = scs.getSparkConfig("rm4"); + assertEquals(sparkConfigs.toString(), 2, sparkConfigs.size()); + assertEquals("Y", sparkConfigs.get("y")); + assertEquals("foo2", sparkConfigs.get("spark.yarn.jars")); + scs.destroy(); + } + + } http://git-wip-us.apache.org/repos/asf/oozie/blob/98ad14bd/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 6937e24..0fe1e1d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2811 Add support for filtering out properties from SparkConfigurationService (gezapeti via rkanter) OOZIE-2802 Spark action failure on Spark 2.1.0 due to duplicate sharelibs (gezapeti via rkanter) OOZIE-2803 Mask passwords when printing out configs/args in MapReduceMain and SparkMain (pbacsko via rkanter) OOZIE-2799 Setting log location for spark sql on hive (satishsaley)