Repository: oozie Updated Branches: refs/heads/master 6c01dc7ff -> 05636799e
OOZIE-3193 Applications are not killed when submitted via subworkflow (kmarton via gezapeti, andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/05636799 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/05636799 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/05636799 Branch: refs/heads/master Commit: 05636799ea12c348474205fb7379d70ab5ef9395 Parents: 6c01dc7 Author: Gezapeti Cseh <gezap...@apache.org> Authored: Wed Aug 1 23:09:52 2018 +0200 Committer: Gezapeti Cseh <gezap...@apache.org> Committed: Wed Aug 1 23:09:52 2018 +0200 ---------------------------------------------------------------------- .../org/apache/oozie/action/ActionExecutor.java | 2 +- .../oozie/action/hadoop/JavaActionExecutor.java | 6 +- .../oozie/action/hadoop/LauncherMainTester.java | 64 ++++- .../hadoop/SleepMapperReducerForTest.java | 68 +++++ .../oozie/TestSubWorkflowActionExecutor.java | 249 +++++++++++++------ release-log.txt | 1 + 6 files changed, 307 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/main/java/org/apache/oozie/action/ActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java index 71bd36a..1770b97 100644 --- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java @@ -603,7 +603,7 @@ public abstract class ActionExecutor { * @return the action yarn tag */ public static String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) { - if (conf.get(OOZIE_ACTION_YARN_TAG) != null) { + if (conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) { return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName(); } else if (wfJob.getParentId() != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index a1a9671..8f0f244 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -1777,7 +1777,7 @@ public class JavaActionExecutor extends ActionExecutor { try { Element actionXml = XmlUtils.parseXml(action.getConf()); final Configuration jobConf = createBaseHadoopConf(context, actionXml); - String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action); + String launcherTag = getActionYarnTag(context, action); jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag)); yarnClient = createYarnClient(context, jobConf); if(action.getExternalId() != null) { @@ -1828,6 +1828,10 @@ public class JavaActionExecutor extends ActionExecutor { } } + private String getActionYarnTag(Context context, WorkflowAction action) { + return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), context.getWorkflow().getParentId(), action); + } + private static Set<String> FINAL_STATUS = new HashSet<String>(); static { http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java index fdca570..ada7005 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java @@ -18,14 +18,31 @@ package org.apache.oozie.action.hadoop; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; import java.util.Properties; public class LauncherMainTester { + public static final String JOB_ID_FILE_NAME = "jobID.txt"; + public static void main(String[] args) throws Throwable { if (args.length == 0) { System.out.println("Hello World!"); @@ -84,10 +101,55 @@ public class LauncherMainTester { sm.checkPermission(null, sm.getSecurityContext()); } } - + if(args.length == 3) { + if(args[0].equals("javamapreduce")) { + executeJavaMapReduce(args); + } + } checkAndSleep(args); } + private static void executeJavaMapReduce(String[] args) throws IOException, InterruptedException { + JobConf jConf = createSleepMapperReducerJobConf(); + final Path input = new Path(args[1]); + FileInputFormat.setInputPaths(jConf, input); + FileOutputFormat.setOutputPath(jConf, new Path(args[2])); + writeToFile(input, jConf, "dummy\n", "data.txt"); + JobClient jc = new JobClient(jConf); + System.out.println("Submitting MR job"); + RunningJob job = jc.submitJob(jConf); + System.out.println("Submitted job " + job.getID().toString()); + writeToFile(input, jConf, job.getID().toString(), JOB_ID_FILE_NAME); + job.waitForCompletion(); + jc.monitorAndPrintJob(jConf, job); + if (job.getJobState() != JobStatus.SUCCEEDED) { + System.err.println(job.getJobState() + " job state instead of" + JobStatus.SUCCEEDED); + System.exit(-1); + } + } + + private static JobConf createSleepMapperReducerJobConf() { + JobConf jConf = new JobConf(true); + jConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml"))); + jConf.setMapperClass(SleepMapperReducerForTest.class); + jConf.setReducerClass(SleepMapperReducerForTest.class); + jConf.setOutputKeyClass(Text.class); + jConf.setOutputValueClass(IntWritable.class); + jConf.setInputFormat(TextInputFormat.class); + jConf.setOutputFormat(TextOutputFormat.class); + jConf.setNumReduceTasks(1); + jConf.set(SleepMapperReducerForTest.SLEEP_TIME_MILLIS_KEY, "60000"); + return jConf; + } + + private static void writeToFile(Path input, JobConf jConf, String content, String fileName) throws IOException { + try (FileSystem fs = FileSystem.get(jConf); + Writer w = new OutputStreamWriter(fs.create(new Path(input, fileName)))) { + w.write(content); + } + System.out.println("Job Id written to file"); + } + private static void checkAndSleep(String args[]) throws InterruptedException { if (args.length == 2 && args[0].equals("sleep")) { long sleepTime = Long.parseLong(args[1]); http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java b/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java new file mode 100644 index 0000000..846441f --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +import java.io.IOException; +import java.util.Iterator; + +public class SleepMapperReducerForTest implements Mapper<Object, Object, Object, Object>, Reducer<Object, Object, Object, Object> { + private long sleepTimeMillis = 20_000l; + public static final String SLEEP_TIME_MILLIS_KEY = "oozie.test.sleep.time.millis"; + + public static void main(String[] args) { + System.out.println("hello!"); + } + + public void close() throws IOException { + } + + @Override + public void map(Object key, Object value, OutputCollector<Object, Object> collector, Reporter reporter) throws IOException { + sleepUninterrupted(sleepTimeMillis, "Mapper sleeping for " + sleepTimeMillis + " millis.", "Mapper woke up"); + } + + @Override + public void reduce(Object key, Iterator<Object> values, OutputCollector<Object, Object> collector, Reporter reporter) + throws IOException { + sleepUninterrupted(sleepTimeMillis, "Reducer sleeping for " + sleepTimeMillis + " millis.", "Reducer woke up"); + } + + @Override + public void configure(JobConf jobConf) { + sleepTimeMillis = jobConf.getLong(SLEEP_TIME_MILLIS_KEY, sleepTimeMillis); + System.out.println("Configuring MR to sleep for" + sleepTimeMillis + " millis."); + + } + + private void sleepUninterrupted(long millis, String preSleepMessage, String postSleepMessage) { + try { + System.out.println(preSleepMessage); + Thread.sleep(millis); + System.out.println(postSleepMessage); + } catch (InterruptedException e) { + } + } +} + http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java index e074d48..893405e 100644 --- a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java @@ -21,15 +21,23 @@ package org.apache.oozie.action.oozie; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.hadoop.ActionExecutorTestCase; import org.apache.oozie.action.hadoop.LauncherMainTester; +import org.apache.oozie.action.hadoop.OozieJobInfo; import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.wf.KillXCommand; import org.apache.oozie.command.wf.SuspendXCommand; import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.XLogService; @@ -37,8 +45,18 @@ import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.StringReader; +import java.io.Writer; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { @@ -483,45 +501,12 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { public void testSubWorkflowSuspend() throws Exception { try { - Path subWorkflowAppPath = getFsTestCaseDir(); - FileSystem fs = getFileSystem(); - Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml"); - Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath)); - writer.write(getLazyWorkflow()); - writer.close(); - - String workflowUri = getTestCaseFileUri("workflow.xml"); - String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" + - "<start to=\"subwf\"/>" + - "<action name=\"subwf\">" + - " <sub-workflow xmlns='uri:oozie:workflow:0.4'>" + - " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" + - " </sub-workflow>" + - " <ok to=\"end\"/>" + - " <error to=\"fail\"/>" + - "</action>" + - "<kill name=\"fail\">" + - " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + - "</kill>" + - "<end name=\"end\"/>" + - "</workflow-app>"; - - writeToFile(appXml, workflowUri); + String workflowUri = createSubWorkflowWithLazyAction(true); LocalOozie.start(); final OozieClient wfClient = LocalOozie.getClient(); - Properties conf = wfClient.createConfiguration(); - conf.setProperty(OozieClient.APP_PATH, workflowUri); - conf.setProperty(OozieClient.USER_NAME, getTestUser()); - conf.setProperty("appName", "var-app-name"); - final String jobId = wfClient.submit(conf); - wfClient.start(jobId); + final String jobId = submitWorkflow(workflowUri, wfClient); - waitFor(JOB_TIMEOUT, new Predicate() { - public boolean evaluate() throws Exception { - return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) && - (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING); - } - }); + waitForSubWFtoStart(wfClient, jobId); WorkflowJob wf = wfClient.getJobInfo(jobId); // Suspending subworkflow new SuspendXCommand(wf.getActions().get(1).getExternalId()).call(); @@ -536,6 +521,66 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { } + public void testSubWorkflowKillExternalChild() throws Exception { + try { + LocalOozie.start(); + final String workflowUri = createSubWorkflowWithLazyAction(true); + final OozieClient wfClient = LocalOozie.getClient(); + final String jobId = submitWorkflow(workflowUri, wfClient); + final Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri()); + + waitForSubWFtoStart(wfClient, jobId); + + final ApplicationId externalChildJobId = getChildMRJobApplicationId(conf); + killWorkflow(jobId); + waitUntilYarnAppKilledAndAssertSuccess(externalChildJobId.toString()); + } finally { + LocalOozie.stop(); + } + + } + + private void killWorkflow(String jobId) throws CommandException { + new KillXCommand(jobId).call(); + } + + private ApplicationId getChildMRJobApplicationId(Configuration conf) throws IOException { + final List<ApplicationId> applicationIdList = new ArrayList<>(); + final Path inputDir = new Path(getFsTestCaseDir(), "input"); + final Path wfIDFile = new Path(inputDir, LauncherMainTester.JOB_ID_FILE_NAME); + final FileSystem fs = FileSystem.get(conf); + + // wait until we have the running child MR job's ID from HDFS + waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile)); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(wfIDFile)))) { + String line = reader.readLine(); + JobID.forName(line); + String jobID = line; + String appID = jobID.replace("job", "application"); + ApplicationId id = ConverterUtils.toApplicationId(appID); + applicationIdList.add(id); + } + + assertTrue("Application ID should've been found. No external Child ID was found in " + wfIDFile.toString(), + applicationIdList.size() == 1); + return applicationIdList.get(0); + } + + private void waitForSubWFtoStart(OozieClient wfClient, String jobId) { + waitFor(JOB_TIMEOUT, new SubWorkflowActionRunningPredicate(wfClient,jobId)); + } + + private String submitWorkflow(String workflowUri, OozieClient wfClient) throws OozieClientException { + Properties conf = wfClient.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, workflowUri); + conf.setProperty(OozieClient.USER_NAME, getTestUser()); + conf.setProperty("appName", "var-app-name"); + final String jobId = wfClient.submit(conf); + wfClient.start(jobId); + return jobId; + } + private void writeToFile(String appXml, String appPath) throws IOException { // TODO Auto-generated method stub File wf = new File(URI.create(appPath)); @@ -554,16 +599,11 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { } } - public String getLazyWorkflow() { + public String getLazyWorkflow(boolean launchMRAction) { return "<workflow-app xmlns='uri:oozie:workflow:0.4' name='app'>" + "<start to='java' />" + " <action name='java'>" + - "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<main-class>" + JavaSleepAction.class.getName() + "</main-class>" + - "<arg>exit0</arg>" + - "</java>" + getAction(launchMRAction) + "<ok to='end' />" + "<error to='fail' />" + "</action>" @@ -574,32 +614,29 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { + "</workflow-app>"; } + private String getAction(boolean launchMRAction) { + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output"); + String javaActionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class>" + JavaSleepAction.class.getName()+ "</main-class>" + + "</java>"; + String javaWithMRActionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class>" + LauncherMainTester.class.getName()+ "</main-class>" + + "<arg>javamapreduce</arg>" + + "<arg>"+inputDir.toString()+"</arg>" + + "<arg>"+outputDir.toString()+"</arg>" + + "</java>"; + String actionXml = launchMRAction ? javaWithMRActionXml : javaActionXml; + return actionXml; + } + public void testSubWorkflowRerun() throws Exception { try { - Path subWorkflowAppPath = getFsTestCaseDir(); - FileSystem fs = getFileSystem(); - Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml"); - Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath)); - writer.write(getLazyWorkflow()); - writer.close(); - - String workflowUri = getTestCaseFileUri("workflow.xml"); - String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" + - "<start to=\"subwf\"/>" + - "<action name=\"subwf\">" + - " <sub-workflow xmlns='uri:oozie:workflow:0.4'>" + - " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" + - " </sub-workflow>" + - " <ok to=\"end\"/>" + - " <error to=\"fail\"/>" + - "</action>" + - "<kill name=\"fail\">" + - " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + - "</kill>" + - "<end name=\"end\"/>" + - "</workflow-app>"; - - writeToFile(appXml, workflowUri); + String workflowUri = createSubWorkflowWithLazyAction(false); LocalOozie.start(); final OozieClient wfClient = LocalOozie.getClient(); Properties conf = wfClient.createConfiguration(); @@ -609,12 +646,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { final String jobId = wfClient.submit(conf); wfClient.start(jobId); - waitFor(JOB_TIMEOUT, new Predicate() { - public boolean evaluate() throws Exception { - return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) && - (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING); - } - }); + waitForSubWFtoStart(wfClient, jobId); String subWorkflowExternalId = wfClient.getJobInfo(jobId).getActions().get(1).getExternalId(); wfClient.kill(wfClient.getJobInfo(jobId).getActions().get(1).getExternalId()); @@ -647,6 +679,34 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { } + private String createSubWorkflowWithLazyAction(boolean launchMRAction) throws IOException { + Path subWorkflowAppPath = getFsTestCaseDir(); + FileSystem fs = getFileSystem(); + Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml"); + try (Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath))) { + writer.write(getLazyWorkflow(launchMRAction)); + } + + String workflowUri = getTestCaseFileUri("workflow.xml"); + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"workflow\">" + + "<start to=\"subwf\"/>" + + "<action name=\"subwf\">" + + " <sub-workflow xmlns='uri:oozie:workflow:1.0'>" + + " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" + + " </sub-workflow>" + + " <ok to=\"end\"/>" + + " <error to=\"fail\"/>" + + "</action>" + + "<kill name=\"fail\">" + + " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + + "</kill>" + + "<end name=\"end\"/>" + + "</workflow-app>"; + + writeToFile(appXml, workflowUri); + return workflowUri; + } + public void testParentGlobalConf() throws Exception { try { Path subWorkflowAppPath = createSubWorkflowXml(); @@ -654,12 +714,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { String workflowUri = createTestWorkflowXml(subWorkflowAppPath); LocalOozie.start(); final OozieClient wfClient = LocalOozie.getClient(); - Properties conf = wfClient.createConfiguration(); - conf.setProperty(OozieClient.APP_PATH, workflowUri); - conf.setProperty(OozieClient.USER_NAME, getTestUser()); - conf.setProperty("appName", "var-app-name"); - final String jobId = wfClient.submit(conf); - wfClient.start(jobId); + final String jobId = submitWorkflow(workflowUri, wfClient); waitFor(JOB_TIMEOUT, new Predicate() { public boolean evaluate() throws Exception { @@ -870,4 +925,38 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { + "<end name='end' />" + "</workflow-app>"; } + + private static class ApplicationIdExistsPredicate implements Predicate { + + private final FileSystem fs; + private final Path wfIDFile; + + public ApplicationIdExistsPredicate(FileSystem fs, Path wfIDFile) { + this.fs = fs; + this.wfIDFile = wfIDFile; + } + + @Override + public boolean evaluate() throws Exception { + return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() > 0; + } + } + + private static class SubWorkflowActionRunningPredicate implements Predicate { + private final OozieClient wfClient; + private final String jobId; + + public SubWorkflowActionRunningPredicate(OozieClient wfClient, String jobId) { + this.wfClient = wfClient; + this.jobId = jobId; + } + + @Override + public boolean evaluate() throws Exception { + boolean isSubWfRunning = wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING; + boolean isSubWfActionRunning = wfClient.getJobInfo(jobId) + .getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING; + return isSubWfRunning && isSubWfActionRunning; + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f04eeb2..48dbceb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3193 Applications are not killed when submitted via subworkflow (kmarton via gezapeti, andras.piros) OOZIE-3310 SQL error during /v2/sla filtering (asalamon74 via andras.piros) OOZIE-2942 [examples] Fix Findbugs warnings (Jan Hentschel, kmarton via andras.piros) OOZIE-2718 Improve -dryrun for bundles (zhengxb2005, asalamon74 via andras.piros)