Repository: oozie
Updated Branches:
  refs/heads/master 5f1444496 -> 299370b44


OOZIE-3112 SparkConfigrationService overwrites properties provided via 
--properties-file option in SparkAction (gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/299370b4
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/299370b4
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/299370b4

Branch: refs/heads/master
Commit: 299370b44e2d7b22bfe622d19f65b02c5b18282f
Parents: 5f14444
Author: Gezapeti Cseh <gezap...@apache.org>
Authored: Fri Dec 1 12:10:11 2017 +0100
Committer: Gezapeti Cseh <gezap...@apache.org>
Committed: Fri Dec 1 12:10:11 2017 +0100

----------------------------------------------------------------------
 .../action/hadoop/SparkActionExecutor.java      |  22 ++--
 .../site/twiki/DG_SparkActionExtension.twiki    |  18 +++-
 release-log.txt                                 |   1 +
 sharelib/spark/pom.xml                          |   4 +
 .../oozie/action/hadoop/SparkArgsExtractor.java | 100 +++++++++++++++++--
 .../action/hadoop/TestSparkActionExecutor.java  |  25 +++--
 .../action/hadoop/TestSparkArgsExtractor.java   |  81 +++++++++++++++
 7 files changed, 223 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 2338ad2..5f399c4 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.action.hadoop;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.action.ActionExecutorException;
@@ -27,6 +28,8 @@ import org.apache.oozie.service.SparkConfigurationService;
 import org.jdom.Element;
 import org.jdom.Namespace;
 
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +40,7 @@ public class SparkActionExecutor extends JavaActionExecutor {
     public static final String SPARK_MASTER = "oozie.spark.master";
     public static final String SPARK_MODE = "oozie.spark.mode";
     public static final String SPARK_OPTS = "oozie.spark.spark-opts";
+    public static final String SPARK_DEFAULT_OPTS = 
"oozie.spark.spark-default-opts";
     public static final String SPARK_JOB_NAME = "oozie.spark.name";
     public static final String SPARK_CLASS = "oozie.spark.class";
     public static final String SPARK_JAR = "oozie.spark.jar";
@@ -72,22 +76,22 @@ public class SparkActionExecutor extends JavaActionExecutor 
{
         String jarLocation = actionXml.getChildTextTrim("jar", ns);
         actionConf.set(SPARK_JAR, jarLocation);
 
-        StringBuilder sparkOptsSb = new StringBuilder();
         if (master.startsWith("yarn")) {
             String resourceManager = actionConf.get(HADOOP_YARN_RM);
             Properties sparkConfig =
                     
Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager);
-            for (String property : sparkConfig.stringPropertyNames()) {
-                sparkOptsSb.append("--conf ")
-                        
.append(property).append("=").append(sparkConfig.getProperty(property)).append("
 ");
+            if (!sparkConfig.isEmpty()) {
+                try (final StringWriter sw = new StringWriter()) {
+                    sparkConfig.store(sw, "Generated by Oozie server 
SparkActionExecutor");
+                    actionConf.set(SPARK_DEFAULT_OPTS, sw.toString());
+                } catch (IOException e) {
+                    LOG.warn("Could not propagate Spark default 
configuration!", e);
+                }
             }
         }
         String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns);
-        if (sparkOpts != null) {
-            sparkOptsSb.append(sparkOpts);
-        }
-        if (sparkOptsSb.length() > 0) {
-            actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim());
+        if (!Strings.isNullOrEmpty(sparkOpts)) {
+            actionConf.set(SPARK_OPTS, sparkOpts.toString().trim());
         }
 
         // Setting if SparkMain should setup hadoop config *-site.xml

http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki 
b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index 45b60b8..294e7dd 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -97,16 +97,26 @@ The =class= element if present, indicates the spark's 
application main class.
 
 The =jar= element indicates a comma separated list of jars or python files.
 
-The =spark-opts= element, if present, contains a list of spark options that 
can be passed to spark driver. Spark configuration
-options can be passed by specifying '--conf key=value' here, or from 
=oozie.service.SparkConfigurationService.spark.configurations=
-in oozie-site.xml. Values containing whitespaces can be enclosed by double 
quotes. The =spark-opts= configs have priority.
+The =spark-opts= element, if present, contains a list of Spark options that 
can be passed to Spark. Spark configuration
+options can be passed by specifying '--conf key=value' or other Spark CLI 
options.
+Values containing whitespaces can be enclosed by double quotes.
 
 Some examples of the =spark-opts= element:
    * '--conf key=value'
    * '--conf key1=value1 value2'
    * '--conf key1="value1 value2"'
    * '--conf key1=value1 key2="value2 value3"'
-   * '--conf key=value --verbose'
+   * '--conf key=value --verbose --properties-file user.properties'
+
+There are several ways to define properties that will be passed to Spark. They 
are processed in the following order:
+    * propagated from 
=oozie.service.SparkConfigurationService.spark.configurations=
+    * read from a localized =spark-defaults.conf= file
+    * read from a file defined in =spark-opts= via the =--properties-file=
+    * properties defined in =spark-opts= element
+
+(The latter takes precedence over the former.)
+The server propagated properties, the =spark-defaults.conf= and the 
user-defined properties file are merged together into a
+single properties file as Spark handles only one file in its 
=--properties-file= option.
 
 The =arg= element if present, contains arguments that can be passed to spark 
application.
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 9a3f7b5..d10bdde 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-3112 SparkConfigrationService overwrites properties provided via 
--properties-file option in SparkAction (gezapeti)
 OOZIE-3126 Add option to allow list of users to access system config 
(satishsaley)
 OOZIE-2900 Retrieve tokens for oozie.launcher.mapreduce.job.hdfs-servers 
before submission (asasvari)
 OOZIE-3132 Instrument SLACalculatorMemory (andras.piros)

http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index 65f641a..e9174c2 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -355,6 +355,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>annotations</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
index 052950f..2f3cfbe 100644
--- 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
+++ 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.action.hadoop;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.commons.lang.StringUtils;
 import org.apache.directory.api.util.Strings;
 import org.apache.hadoop.conf.Configuration;
@@ -26,16 +27,28 @@ import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
 
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.Writer;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
+import static 
org.apache.oozie.action.hadoop.SparkActionExecutor.SPARK_DEFAULT_OPTS;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "Properties 
file should be specified by user")
 class SparkArgsExtractor {
     private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = 
Pattern.compile("spark-defaults.conf");
     private static final String FILES_OPTION = "--files";
@@ -71,6 +84,7 @@ class SparkArgsExtractor {
     private static final String DEPLOY_MODE_CLIENT = "client";
     private static final String SPARK_YARN_TAGS = "spark.yarn.tags";
     private static final String OPT_PROPERTIES_FILE = "--properties-file";
+    public static final String SPARK_DEFAULTS_GENERATED_PROPERTIES = 
"spark-defaults-oozie-generated.properties";
 
     private boolean pySpark = false;
     private final Configuration actionConf;
@@ -135,6 +149,7 @@ class SparkArgsExtractor {
         final StringBuilder userFiles = new StringBuilder();
         final StringBuilder userArchives = new StringBuilder();
         final String sparkOpts = 
actionConf.get(SparkActionExecutor.SPARK_OPTS);
+        String propertiesFile = null;
         if (StringUtils.isNotEmpty(sparkOpts)) {
             final List<String> sparkOptions = 
SparkOptionsSplitter.splitSparkOpts(sparkOpts);
             for (int i = 0; i < sparkOptions.size(); i++) {
@@ -177,7 +192,11 @@ class SparkArgsExtractor {
                 if (opt.startsWith(SECURITY_CREDENTIALS_HBASE)) {
                     addedSecurityCredentialsHBase = true;
                 }
-
+                if (opt.startsWith(OPT_PROPERTIES_FILE)){
+                    i++;
+                    propertiesFile = sparkOptions.get(i);
+                    addToSparkArgs = false;
+                }
                 if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || 
opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) {
                     if (!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) {
                         opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + 
SparkMain.SPARK_LOG4J_PROPS;
@@ -283,11 +302,7 @@ class SparkArgsExtractor {
             sparkArgs.add(CONF_OPTION);
             sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + 
LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS);
         }
-        final File defaultConfFile = 
SparkMain.getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN);
-        if (defaultConfFile != null) {
-            sparkArgs.add(OPT_PROPERTIES_FILE);
-            sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString());
-        }
+        mergeAndAddPropertiesFile(sparkArgs, propertiesFile);
 
         if ((yarnClusterMode || yarnClientMode)) {
             final Map<String, URI> fixedFileUrisMap =
@@ -326,6 +341,79 @@ class SparkArgsExtractor {
         return sparkArgs;
     }
 
+    private void mergeAndAddPropertiesFile(final List<String> sparkArgs, final 
String userDefinedPropertiesFile)
+            throws IOException {
+        final Properties properties = new Properties();
+        loadServerDefaultProperties(properties);
+        loadLocalizedDefaultPropertiesFile(properties);
+        loadUserDefinedPropertiesFile(userDefinedPropertiesFile, properties);
+        final boolean persisted = persistMergedProperties(properties);
+        if (persisted) {
+            sparkArgs.add(OPT_PROPERTIES_FILE);
+            sparkArgs.add(SPARK_DEFAULTS_GENERATED_PROPERTIES);
+        }
+    }
+
+    private boolean persistMergedProperties(final Properties properties) 
throws IOException {
+        if (!properties.isEmpty()) {
+            try (final Writer writer = new OutputStreamWriter(
+                    new FileOutputStream(new 
File(SPARK_DEFAULTS_GENERATED_PROPERTIES)),
+                            StandardCharsets.UTF_8.name())) {
+                properties.store(writer, "Properties file generated by Oozie");
+                System.out.println(String.format("Persisted merged Spark 
configs in file %s. Merged properties are: %s",
+                        SPARK_DEFAULTS_GENERATED_PROPERTIES, 
Arrays.toString(properties.stringPropertyNames().toArray())));
+                return true;
+            } catch (IOException e) {
+                System.err.println(String.format("Could not persist derived 
Spark config file. Reason: %s", e.getMessage()));
+                throw e;
+            }
+        }
+        return false;
+    }
+
+    private void loadUserDefinedPropertiesFile(final String 
userDefinedPropertiesFile, final Properties properties) {
+        if (userDefinedPropertiesFile != null) {
+            System.out.println(String.format("Reading Spark config from %s 
%s...", OPT_PROPERTIES_FILE, userDefinedPropertiesFile));
+            loadProperties(new File(userDefinedPropertiesFile), properties);
+        }
+    }
+
+    private void loadLocalizedDefaultPropertiesFile(final Properties 
properties) {
+        final File localizedDefaultConfFile = 
SparkMain.getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN);
+        if (localizedDefaultConfFile != null) {
+            System.out.println(String.format("Reading Spark config from file 
%s...", localizedDefaultConfFile.getName()));
+            loadProperties(localizedDefaultConfFile, properties);
+        }
+    }
+
+    private void loadServerDefaultProperties(final Properties properties) {
+        final String sparkDefaultsFromServer = 
actionConf.get(SPARK_DEFAULT_OPTS, "");
+        if (!sparkDefaultsFromServer.isEmpty()) {
+            System.out.println("Reading Spark config propagated from Oozie 
server...");
+            try (final StringReader reader = new 
StringReader(sparkDefaultsFromServer)) {
+                properties.load(reader);
+            } catch (IOException e) {
+                System.err.println(String.format("Could not read propagated 
Spark config! Reason: %s", e.getMessage()));
+            }
+        }
+    }
+
+    private void loadProperties(final File file, final Properties target) {
+        try (final Reader reader = new InputStreamReader(new 
FileInputStream(file), StandardCharsets.UTF_8.name())) {
+            final Properties properties = new Properties();
+            properties.load(reader);
+            for(String key :properties.stringPropertyNames()) {
+                Object prevProperty = target.setProperty(key, 
properties.getProperty(key));
+                if(prevProperty != null){
+                    System.out.println(String.format("Value of %s was 
overwritten from %s", key, file.getName()));
+                }
+            }
+        } catch (IOException e) {
+            System.err.println(String.format("Could not read Spark configs 
from file %s. Reason: %s", file.getName(),
+                    e.getMessage()));
+        }
+    }
+
     private void appendWithPathSeparator(final String what, final 
StringBuilder to) {
         if (to.length() > 0) {
             to.append(File.pathSeparator);

http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
index d97f1f0..bf06d14 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
@@ -45,6 +45,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.StringReader;
 import java.io.Writer;
 
 import java.text.MessageFormat;
@@ -82,12 +83,8 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
         Properties sparkConfProps = new Properties();
         sparkConfProps.setProperty("a", "A");
         sparkConfProps.setProperty("b", "B");
-        FileOutputStream fos = null;
-        try {
-            fos = new FileOutputStream(sparkConf);
+        try (FileOutputStream fos = new FileOutputStream(sparkConf)){
             sparkConfProps.store(fos, "");
-        } finally {
-            IOUtils.closeSafely(fos);
         }
         SparkConfigurationService scs = 
Services.get().get(SparkConfigurationService.class);
         scs.destroy();
@@ -104,7 +101,7 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
     }
 
     @SuppressWarnings("unchecked")
-    private void _testSetupMethods(String master, Map<String, String> 
extraSparkOpts, String mode) throws Exception {
+    private void _testSetupMethods(String master, Map<String, String> 
defaultSparkOpts, String mode) throws Exception {
         SparkActionExecutor ae = new SparkActionExecutor();
         assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses());
 
@@ -137,16 +134,26 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
         assertEquals(getNameNodeUri() + "/foo.jar", 
conf.get("oozie.spark.jar"));
         Map<String, String> sparkOpts = new HashMap<String, String>();
         sparkOpts.put("foo", "bar");
-        sparkOpts.putAll(extraSparkOpts);
-        Matcher m = 
SPARK_OPTS_PATTERN.matcher(conf.get("oozie.spark.spark-opts"));
+        sparkOpts.putAll(defaultSparkOpts);
+        final String configSparkOpts = conf.get("oozie.spark.spark-opts");
         int count = 0;
+        Matcher m = SPARK_OPTS_PATTERN.matcher(configSparkOpts);
         while (m.find()) {
             count++;
             String key = m.group(1);
             String val = m.group(2);
             assertEquals(sparkOpts.get(key), val);
         }
-        assertEquals(sparkOpts.size(), count);
+        if(defaultSparkOpts.size() > 0) {
+            final String defaultCconfigSparkOpts = 
conf.get("oozie.spark.spark-default-opts");
+            Properties p = new Properties();
+            p.load(new StringReader(defaultCconfigSparkOpts));
+            for(String key : defaultSparkOpts.keySet()){
+                count++;
+                assertEquals(sparkOpts.get(key), p.getProperty(key));
+            }
+        }
+        assertEquals(configSparkOpts, sparkOpts.size(), count);
     }
 
     private String getActionXml() {

http://git-wip-us.apache.org/repos/asf/oozie/blob/299370b4/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
index 574bf24..6a9baa5 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
@@ -20,11 +20,19 @@ package org.apache.oozie.action.hadoop;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.junit.After;
 import org.junit.Test;
+import scala.util.PropertiesTrait;
 
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Properties;
 
+import static 
org.apache.oozie.action.hadoop.SparkArgsExtractor.SPARK_DEFAULTS_GENERATED_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 
 public class TestSparkArgsExtractor {
@@ -251,4 +259,77 @@ public class TestSparkArgsExtractor {
                         "arg1"),
                 sparkArgs);
     }
+
+    @Test
+    public void testPropertiesFileMerging() throws Exception {
+        final Configuration actionConf = new Configuration();
+        actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+        actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+        actionConf.set(SparkActionExecutor.SPARK_CLASS, 
"org.apache.oozie.example.SparkFileCopy");
+        actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+        actionConf.set(SparkActionExecutor.SPARK_DEFAULT_OPTS, 
"defaultProperty=1\ndefaultProperty2=2\ndefaultProperty3=3");
+        actionConf.set(SparkActionExecutor.SPARK_OPTS,
+                "--properties-file foo.properties --conf 
spark.driver.extraJavaOptions=-Xmx234m");
+        actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+        createTemporaryFileWithContent("spark-defaults.conf", 
"foo2=bar2\ndefaultProperty3=44\nfoo3=nobar");;
+        createTemporaryFileWithContent("foo.properties", 
"foo=bar\ndefaultProperty2=4\nfoo3=barbar");
+
+        final String[] mainArgs = {"arg0", "arg1"};
+        final List<String> sparkArgs = new 
SparkArgsExtractor(actionConf).extract(mainArgs);
+
+        Properties p = readMergedProperties();
+        assertEquals("property defaultProperty should've been read from 
server-propagated config",
+                "1", p.get("defaultProperty"));
+        assertEquals("property defaultProperty2 should've been overwritten by 
user-defined foo.properties",
+                "4", p.get("defaultProperty2"));
+        assertEquals("property defaultProperty3 should've been overwritten by 
localized spark-defaults.conf",
+                "44", p.get("defaultProperty3"));
+        assertEquals("property foo should've been read from user-defined 
foo.properties",
+                "bar", p.get("foo"));
+        assertEquals("property foo2 should've been read from localized 
spark-defaults.conf",
+                "bar2", p.get("foo2"));
+        assertEquals("property foo3 should've been overwritten by user-defined 
foo.properties",
+                "barbar", p.get("foo3"));
+        assertEquals("Spark args mismatch",
+                Lists.newArrayList("--master", "yarn", "--deploy-mode", 
"client", "--name", "Spark Copy File",
+                        "--class", "org.apache.oozie.example.SparkFileCopy", 
"--conf",
+                        "spark.driver.extraJavaOptions=-Xmx234m 
-Dlog4j.configuration=spark-log4j.properties", "--conf",
+                        "spark.executor.extraClassPath=$PWD/*", "--conf", 
"spark.driver.extraClassPath=$PWD/*", "--conf",
+                        "spark.yarn.security.tokens.hadoopfs.enabled=false", 
"--conf",
+                        "spark.yarn.security.tokens.hive.enabled=false", 
"--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+                        "--conf", 
"spark.yarn.security.credentials.hadoopfs.enabled=false", "--conf",
+                        "spark.yarn.security.credentials.hive.enabled=false", 
"--conf",
+                        "spark.yarn.security.credentials.hbase.enabled=false", 
"--conf",
+                        
"spark.executor.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+                        "--properties-file", 
"spark-defaults-oozie-generated.properties", "--files",
+                        "spark-log4j.properties,hive-site.xml", "--conf", 
"spark.yarn.jar=null", "--verbose", "/lib/test.jar",
+                        "arg0", "arg1"),
+                sparkArgs);
+    }
+
+    private Properties readMergedProperties() throws IOException {
+        final File file = new File(SPARK_DEFAULTS_GENERATED_PROPERTIES);
+        file.deleteOnExit();
+        final Properties properties = new Properties();
+        try(final FileReader reader = new FileReader(file)) {
+            properties.load(reader);
+        }
+        return properties;
+    }
+
+    private void createTemporaryFileWithContent(String filename, String 
content) throws IOException {
+        final File file = new File(filename);
+        file.deleteOnExit();
+        try(final FileWriter fileWriter = new FileWriter(file)) {
+            fileWriter.write(content);
+        }
+    }
+
+    @After
+    public void cleanUp() throws Exception {
+        File f = new File("spark-defaults.conf");
+        if(f.exists()) {
+            f.delete();
+        }
+    }
 }
\ No newline at end of file

Reply via email to