Repository: oozie Updated Branches: refs/heads/master 82925e4d2 -> 5e1c9d362
Revert "OOZIE-2916 Set a job name for the MR Action's child job (asasvari)" This reverts commit 82925e4d21796fd4dc1c9648f00677b98d7dbb81. Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/efc7a822 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/efc7a822 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/efc7a822 Branch: refs/heads/master Commit: efc7a822762453daf8d56c78b7a88343f7b709d3 Parents: 82925e4 Author: Attila Sasvari <asasv...@cloudera.com> Authored: Mon Sep 11 11:47:51 2017 +0200 Committer: Attila Sasvari <asasv...@cloudera.com> Committed: Mon Sep 11 11:47:51 2017 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 22 +--- .../action/hadoop/MapReduceActionExecutor.java | 10 -- .../action/hadoop/TestJavaActionExecutor.java | 1 + release-log.txt | 1 - .../hadoop/TestMapReduceActionExecutor.java | 109 ++++++++++--------- 5 files changed, 66 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 2b1cc7d..bca79aa 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -147,7 +147,6 @@ public class JavaActionExecutor extends ActionExecutor { private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain"; private static final String HADOOP_JOB_NAME = "mapred.job.name"; private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); - public static final String OOZIE_ACTION_NAME = "oozie.action.name"; private static int maxActionOutputLen; private static int maxExternalStatsSize; @@ -945,7 +944,6 @@ public class JavaActionExecutor extends ActionExecutor { // action job configuration Configuration actionConf = loadHadoopDefaultResources(context, actionXml); - addAppNameContext(action, context); setupActionConf(actionConf, context, actionXml, appPathRoot); LOG.debug("Setting LibFilesArchives "); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); @@ -1074,19 +1072,6 @@ public class JavaActionExecutor extends ActionExecutor { } } - protected void addAppNameContext(WorkflowAction action, Context context) { - String oozieActionName = String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s", - getType(), - context.getWorkflow().getAppName(), - action.getName(), - context.getWorkflow().getId()); - context.setVar(OOZIE_ACTION_NAME, oozieActionName); - } - - protected String getAppName(Context context) { - return context.getVar(OOZIE_ACTION_NAME); - } - private Credentials ensureCredentials(final Credentials credentials) { if (credentials == null) { LOG.debug("No credentials present, creating a new one."); @@ -1144,10 +1129,13 @@ public class JavaActionExecutor extends ActionExecutor { ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); - String appName = getAppName(context); + String jobName = XLog.format( + "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), + context.getWorkflow().getAppName(), actionName, + context.getWorkflow().getId()); appContext.setApplicationId(appId); - appContext.setApplicationName(appName); + appContext.setApplicationName(jobName); appContext.setApplicationType("Oozie Launcher"); Priority pri = Records.newRecord(Priority.class); int priority = 0; // TODO: OYA: Add a constant or a config http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index 22d5526..338e508 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -51,7 +50,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor { public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable"; private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain"; public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url"; - private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name"; private XLog log = XLog.getLog(getClass()); public MapReduceActionExecutor() { @@ -163,7 +161,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor { regularMR = true; } } - setJobName(actionConf, context); actionConf = super.setupActionConf(actionConf, context, actionXml, appPath); // For "regular" (not streaming or pipes) MR jobs @@ -208,13 +205,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor { return actionConf; } - private void setJobName(Configuration actionConf, Context context) { - String jobName = getAppName(context); - if (jobName != null) { - actionConf.set(MAPREDUCE_JOB_NAME, jobName.replace("oozie:launcher", "oozie:action")); - } - } - @Override public void end(Context context, WorkflowAction action) throws ActionExecutorException { super.end(context, action); http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index d1d78fd..ce674ad 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 56e955b..82a10aa 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,6 +1,5 @@ -- Oozie 5.0.0 release (trunk - unreleased) -OOZIE-2916 Set a job name for the MR Action's child job (asasvari) OOZIE-2858 HiveMain, ShellMain and SparkMain should not overwrite properties and config files locally (gezapeti) OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko) OOZIE-3026 fix openjpa enhancer stage during build for logging (dbdist13, andras.piros via pbacsko) http://git-wip-us.apache.org/repos/asf/oozie/blob/efc7a822/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index 7237769..2c92f41 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -53,7 +53,6 @@ import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.streaming.StreamJob; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -469,7 +468,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); - JobClient jobClient = getHadoopAccessorService().createJobClient(user, conf); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); org.apache.hadoop.mapreduce.JobID jobID = TypeConverter.fromYarn( ConverterUtils.toApplicationId(externalChildIDs)); final RunningJob mrJob = jobClient.getJob(JobID.downgrade(jobID)); @@ -516,7 +515,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -533,7 +535,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); w.write("dummy\n"); w.write("dummy\n"); - writeDummyInput(fs, outputDir); + Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt"))); + ow.write("dummy\n"); + ow.write("dummy\n"); + ow.close(); String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + @@ -557,7 +562,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); Path jobXml = new Path(getFsTestCaseDir(), "action.xml"); XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString()); @@ -582,7 +590,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -607,7 +618,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString()); conf.setBoolean("oozie.test.throw.exception", true); // causes OozieActionConfiguratorForTest to throw an exception @@ -633,7 +647,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -658,7 +675,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -719,7 +739,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -776,37 +799,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { } } - public void testJobNameSetForMapReduceChildren() throws Exception { - Services serv = Services.get(); - serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true); - - final FileSystem fs = getFileSystem(); - final Path inputDir = new Path(getFsTestCaseDir(), "input"); - final Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); - - final String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" - + getNameNodeUri() + "</name-node>" - + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; - - final String extId = _testSubmit(MAP_REDUCE, actionXml); - final ApplicationId appId = ConverterUtils.toApplicationId(extId); - final Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri()); - final String name = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getName(); - assertTrue(name.contains("oozie:action")); - } - - private void writeDummyInput(FileSystem fs, Path inputDir) throws IOException { - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); - } - - private HadoopAccessorService getHadoopAccessorService() { - return Services.get().get(HadoopAccessorService.class); - } - public void testMapReduceWithUberJarEnabled() throws Exception { Services serv = Services.get(); boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false); @@ -835,7 +827,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { OutputStream os = fs.create(new Path(getAppPath(), streamingJar)); IOUtils.copyStream(is, os); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + " <streaming>" + " <mapper>cat</mapper>" @@ -922,7 +917,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + " <pipes>" + " <program>" + programPath @@ -946,7 +944,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); // set user stats write property as true explicitly in the // configuration. @@ -1005,7 +1006,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); // set user stats write property as false explicitly in the // configuration. @@ -1058,7 +1062,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); // set user stats write property as false explicitly in the // configuration. @@ -1125,7 +1132,11 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - writeDummyInput(fs, inputDir); + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, + "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); XConfiguration mrConfig = getMapReduceConfig(inputDir.toString(), outputDir.toString()); @@ -1161,8 +1172,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.end(context, context.getAction()); assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); - Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri()); - final YarnClient yarnClient = getHadoopAccessorService().createYarnClient(getTestUser(), conf); + Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri()); + final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), conf); ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(externalChildIDs)); // Assert Mapred job name has been set assertEquals(mapredJobName, report.getName());