OOZIE-1978 Forkjoin validation code is ridiculously slow in some cases (pbacsko via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8e9b9042 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8e9b9042 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8e9b9042 Branch: refs/heads/master Commit: 8e9b9042b3270dc5ff975c44a5c977fcc41250e4 Parents: 5c89163 Author: Robert Kanter <rkan...@cloudera.com> Authored: Wed Sep 21 17:44:21 2016 -0700 Committer: Robert Kanter <rkan...@cloudera.com> Committed: Wed Sep 21 17:44:21 2016 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/oozie/ErrorCode.java | 4 +- .../org/apache/oozie/service/ActionService.java | 5 + .../workflow/lite/LiteWorkflowAppParser.java | 364 +---- .../workflow/lite/LiteWorkflowValidator.java | 351 +++++ .../oozie/command/wf/TestSubmitXCommand.java | 2 +- .../lite/TestLiteWorkflowAppParser.java | 112 +- core/src/test/resources/wf-long.xml | 1456 ++++++++++++++++++ release-log.txt | 1 + 8 files changed, 1914 insertions(+), 381 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 2907ca2..8fc3835 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -113,7 +113,7 @@ public enum ErrorCode { E0704(XLog.STD, "Definition already complete, application [{0}]"), E0705(XLog.STD, "Nnode already defined, node [{0}]"), E0706(XLog.STD, "Node cannot transition to itself node [{0}]"), - E0707(XLog.STD, "Loop detected at parsing, node [{0}]"), + E0707(XLog.STD, "Loop detected at parsing, node [{0}], path [{1}]"), E0708(XLog.STD, "Invalid transition, node [{0}] transition [{1}]"), E0709(XLog.STD, "Loop detected at runtime, node [{0}]"), E0710(XLog.STD, "Could not read the workflow definition, {0}"), @@ -153,6 +153,8 @@ public enum ErrorCode { E0744(XLog.STD, "A fork, [{0}], is not allowed to have multiple transitions to the same node, [{1}]"), E0755(XLog.STD, "Workflow Job Rerun Error: {0}"), E0756(XLog.STD, "Exception parsing Kill node message [{0}]"), + E0757(XLog.STD, "Fork node [{0}] has multiple joins: [{1}]"), + E0758(XLog.STD, "Join node [{0}] has multiple forks: [{1}]"), E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"), E0801(XLog.STD, "Workflow already running, workflow [{0}]"), http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/main/java/org/apache/oozie/service/ActionService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/ActionService.java b/core/src/main/java/org/apache/oozie/service/ActionService.java index becc69b..a739a19 100644 --- a/core/src/main/java/org/apache/oozie/service/ActionService.java +++ b/core/src/main/java/org/apache/oozie/service/ActionService.java @@ -138,6 +138,11 @@ public class ActionService implements Service, Instrumentable { return (executorClass != null) ? (ActionExecutor) ReflectionUtils.newInstance(executorClass, null) : null; } + public boolean hasActionType(String actionType) { + ParamChecker.notEmpty(actionType, "actionType"); + return executors.containsKey(actionType); + } + Set<String> getActionTypes() { return executors.keySet(); } http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java index 0541634..a74e5c7 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java @@ -18,57 +18,51 @@ package org.apache.oozie.workflow.lite; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.Deflater; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.Inflater; +import java.util.zip.InflaterInputStream; + +import javax.xml.transform.stream.StreamSource; +import javax.xml.validation.Schema; +import javax.xml.validation.Validator; + import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Writable; -import org.apache.oozie.action.hadoop.FsActionExecutor; import org.apache.oozie.ErrorCode; import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.action.hadoop.FsActionExecutor; import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; +import org.apache.oozie.service.ActionService; import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.Services; import org.apache.oozie.util.ELUtils; import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; -import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.ParameterVerifier; import org.apache.oozie.util.ParameterVerifierException; import org.apache.oozie.util.WritableUtils; -import org.apache.oozie.ErrorCode; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; import org.apache.oozie.workflow.WorkflowException; -import org.apache.oozie.action.ActionExecutor; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.ActionService; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.jdom.Element; import org.jdom.JDOMException; import org.jdom.Namespace; import org.xml.sax.SAXException; -import javax.xml.transform.stream.StreamSource; -import javax.xml.validation.Schema; -import javax.xml.validation.Validator; - -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; -import java.io.StringWriter; -import java.io.ByteArrayOutputStream; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Deque; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.zip.*; - /** * Class to parse and validate workflow xml */ @@ -124,29 +118,6 @@ public class LiteWorkflowAppParser { private Class<? extends DecisionNodeHandler> decisionHandlerClass; private Class<? extends ActionNodeHandler> actionHandlerClass; - private static enum VisitStatus { - VISITING, VISITED - } - - /** - * We use this to store a node name and its top (eldest) decision parent node name for the forkjoin validation - */ - class NodeAndTopDecisionParent { - String node; - String topDecisionParent; - - public NodeAndTopDecisionParent(String node, String topDecisionParent) { - this.node = node; - this.topDecisionParent = topDecisionParent; - } - } - - private List<String> forkList = new ArrayList<String>(); - private List<String> joinList = new ArrayList<String>(); - private StartNodeDef startNode; - private List<NodeAndTopDecisionParent> visitedOkNodes = new ArrayList<NodeAndTopDecisionParent>(); - private List<String> visitedJoinNodes = new ArrayList<String>(); - private String defaultNameNode; private String defaultJobTracker; @@ -201,14 +172,18 @@ public class LiteWorkflowAppParser { Element wfDefElement = XmlUtils.parseXml(strDef); ParameterVerifier.verifyParameters(jobConf, wfDefElement); LiteWorkflowApp app = parse(strDef, wfDefElement, configDefault, jobConf); - Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>(); - traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING); - validate(app, app.getNode(StartNodeDef.START), traversed); - //Validate whether fork/join are in pair or not + + + boolean validateForkJoin = false; + if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true) && ConfigurationService.getBoolean(VALIDATE_FORK_JOIN)) { - validateForkJoin(app); + validateForkJoin = true; } + + LiteWorkflowValidator validator = new LiteWorkflowValidator(); + validator.validateWorkflow(app, validateForkJoin); + return app; } catch (ParameterVerifierException ex) { @@ -226,201 +201,6 @@ public class LiteWorkflowAppParser { } /** - * Validate whether fork/join are in pair or not - * @param app LiteWorkflowApp - * @throws WorkflowException - */ - private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException { - // Make sure the number of forks and joins in wf are equal - if (forkList.size() != joinList.size()) { - throw new WorkflowException(ErrorCode.E0730); - } - - // No need to bother going through all of this if there are no fork/join nodes - if (!forkList.isEmpty()) { - visitedOkNodes.clear(); - visitedJoinNodes.clear(); - validateForkJoin(startNode, app, new LinkedList<String>(), new LinkedList<String>(), new LinkedList<String>(), true, - null); - } - } - - /* - * Recursively walk through the DAG and make sure that all fork paths are valid. - * This should be called from validateForkJoin(LiteWorkflowApp app). It assumes that visitedOkNodes and visitedJoinNodes are - * both empty ArrayLists on the first call. - * - * @param node the current node; use the startNode on the first call - * @param app the WorkflowApp - * @param forkNodes a stack of the current fork nodes - * @param joinNodes a stack of the current join nodes - * @param path a stack of the current path - * @param okTo false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has - * already been visited at least once before - * @param topDecisionParent The top (eldest) decision node along the path to this node, or null if there isn't one - * @throws WorkflowException - */ - private void validateForkJoin(NodeDef node, LiteWorkflowApp app, Deque<String> forkNodes, Deque<String> joinNodes, - Deque<String> path, boolean okTo, String topDecisionParent) throws WorkflowException { - if (path.contains(node.getName())) { - // cycle - throw new WorkflowException(ErrorCode.E0741, node.getName(), Arrays.toString(path.toArray())); - } - path.push(node.getName()); - - // Make sure that we're not revisiting a node (that's not a Kill, Join, or End type) that's been visited before from an - // "ok to" transition; if its from an "error to" transition, then its okay to visit it multiple times. Also, because we - // traverse through join nodes multiple times, we have to make sure not to throw an exception here when we're really just - // re-walking the same execution path (this is why we need the visitedJoinNodes list used later) - if (okTo && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) { - NodeAndTopDecisionParent natdp = findInVisitedOkNodes(node.getName()); - if (natdp != null) { - // However, if we've visited the node and it's under a decision node, we may be seeing it again and it's only - // illegal if that decision node is not the same as what we're seeing now (because during execution we only go - // down one path of the decision node, so while we're seeing the node multiple times here, during runtime it will - // only be executed once). Also, this decision node should be the top (eldest) decision node. As null indicates - // that there isn't a decision node, when this happens they must both be null to be valid. Here is a good example - // to visualize a node ("actionX") that has three "ok to" paths to it, but should still be a valid workflow (it may - // be easier to see if you draw it): - // decisionA --> {actionX, decisionB} - // decisionB --> {actionX, actionY} - // actionY --> {actionX} - // And, if we visit this node twice under the same decision node in an invalid way, the path cycle checking code - // will catch it, so we don't have to worry about that here. - if ((natdp.topDecisionParent == null && topDecisionParent == null) - || (natdp.topDecisionParent == null && topDecisionParent != null) - || (natdp.topDecisionParent != null && topDecisionParent == null) - || !natdp.topDecisionParent.equals(topDecisionParent)) { - // If we get here, then we've seen this node before from an "ok to" transition but they don't have the same - // decision node top parent, which means that this node will be executed twice, which is illegal - throw new WorkflowException(ErrorCode.E0743, node.getName()); - } - } - else { - // If we haven't transitioned to this node before, add it and its top decision parent node - visitedOkNodes.add(new NodeAndTopDecisionParent(node.getName(), topDecisionParent)); - } - } - - if (node instanceof StartNodeDef) { - String transition = node.getTransitions().get(0); // start always has only 1 transition - NodeDef tranNode = app.getNode(transition); - validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent); - } - else if (node instanceof ActionNodeDef) { - String transition = node.getTransitions().get(0); // "ok to" transition - NodeDef tranNode = app.getNode(transition); - validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent); // propogate okTo - transition = node.getTransitions().get(1); // "error to" transition - tranNode = app.getNode(transition); - validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false, topDecisionParent); // use false - } - else if (node instanceof DecisionNodeDef) { - for(String transition : (new HashSet<String>(node.getTransitions()))) { - NodeDef tranNode = app.getNode(transition); - // if there currently isn't a topDecisionParent (i.e. null), then use this node instead of propagating null - String parentDecisionNode = topDecisionParent; - if (parentDecisionNode == null) { - parentDecisionNode = node.getName(); - } - validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, parentDecisionNode); - } - } - else if (node instanceof ForkNodeDef) { - forkNodes.push(node.getName()); - List<String> transitionsList = node.getTransitions(); - HashSet<String> transitionsSet = new HashSet<String>(transitionsList); - // Check that a fork doesn't go to the same node more than once - if (!transitionsList.isEmpty() && transitionsList.size() != transitionsSet.size()) { - // Now we have to figure out which node is the problem and what type of node they are (join and kill are ok) - for (int i = 0; i < transitionsList.size(); i++) { - String a = transitionsList.get(i); - NodeDef aNode = app.getNode(a); - if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof KillNodeDef)) { - for (int k = i+1; k < transitionsList.size(); k++) { - String b = transitionsList.get(k); - if (a.equals(b)) { - throw new WorkflowException(ErrorCode.E0744, node.getName(), a); - } - } - } - } - } - for(String transition : transitionsSet) { - NodeDef tranNode = app.getNode(transition); - validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, topDecisionParent); - } - forkNodes.pop(); - if (!joinNodes.isEmpty()) { - joinNodes.pop(); - } - } - else if (node instanceof JoinNodeDef) { - if (forkNodes.isEmpty()) { - // no fork for join to match with - throw new WorkflowException(ErrorCode.E0742, node.getName()); - } - if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || !joinNodes.peek().equals(node.getName()))) { - joinNodes.push(node.getName()); - } - if (!joinNodes.peek().equals(node.getName())) { - // join doesn't match fork - throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), node.getName(), joinNodes.peek()); - } - joinNodes.pop(); - String currentForkNode = forkNodes.pop(); - String transition = node.getTransitions().get(0); // join always has only 1 transition - NodeDef tranNode = app.getNode(transition); - // If we're already under a situation where okTo is false, use false (propogate it) - // Or if we've already visited this join node, use false (because we've already traversed this path before and we don't - // want to throw an exception from the check against visitedOkNodes) - if (!okTo || visitedJoinNodes.contains(node.getName())) { - validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false, topDecisionParent); - // Else, use true because this is either the first time we've gone through this join node or okTo was already false - } else { - visitedJoinNodes.add(node.getName()); - validateForkJoin(tranNode, app, forkNodes, joinNodes, path, true, topDecisionParent); - } - forkNodes.push(currentForkNode); - joinNodes.push(node.getName()); - } - else if (node instanceof KillNodeDef) { - // do nothing - } - else if (node instanceof EndNodeDef) { - if (!forkNodes.isEmpty()) { - path.pop(); // = node - String parent = path.peek(); - // can't go to an end node in a fork - throw new WorkflowException(ErrorCode.E0737, parent, node.getName()); - } - } - else { - // invalid node type (shouldn't happen) - throw new WorkflowException(ErrorCode.E0740, node.getName()); - } - path.pop(); - } - - /** - * Return a {@link NodeAndTopDecisionParent} whose {@link NodeAndTopDecisionParent#node} is equal to the passed in name, or null - * if it isn't in the {@link LiteWorkflowAppParser#visitedOkNodes} list. - * - * @param name The name to search for - * @return a NodeAndTopDecisionParent or null - */ - private NodeAndTopDecisionParent findInVisitedOkNodes(String name) { - NodeAndTopDecisionParent natdp = null; - for (NodeAndTopDecisionParent v : visitedOkNodes) { - if (v.node.equals(name)) { - natdp = v; - break; - } - } - return natdp; - } - - /** * Parse xml to {@link LiteWorkflowApp} * * @param strDef @@ -573,76 +353,6 @@ public class LiteWorkflowAppParser { return Base64.encodeBase64String(baos.toByteArray()); } - /** - * Validate workflow xml - * - * @param app - * @param node - * @param traversed - * @throws WorkflowException - */ - private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException { - if (node instanceof StartNodeDef) { - startNode = (StartNodeDef) node; - } - else { - try { - ParamChecker.validateActionName(node.getName()); - } - catch (IllegalArgumentException ex) { - throw new WorkflowException(ErrorCode.E0724, ex.getMessage()); - } - } - if (node instanceof ActionNodeDef) { - try { - Element action = XmlUtils.parseXml(node.getConf()); - boolean supportedAction = Services.get().get(ActionService.class).getExecutor(action.getName()) != null; - if (!supportedAction) { - throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName()); - } - } - catch (JDOMException ex) { - throw new RuntimeException("It should never happen, " + ex.getMessage(), ex); - } - } - - if(node instanceof ForkNodeDef){ - forkList.add(node.getName()); - } - - if(node instanceof JoinNodeDef){ - joinList.add(node.getName()); - } - - if (node instanceof EndNodeDef) { - traversed.put(node.getName(), VisitStatus.VISITED); - return; - } - if (node instanceof KillNodeDef) { - traversed.put(node.getName(), VisitStatus.VISITED); - return; - } - for (String transition : node.getTransitions()) { - - if (app.getNode(transition) == null) { - throw new WorkflowException(ErrorCode.E0708, node.getName(), transition); - } - - //check if it is a cycle - if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITING) { - throw new WorkflowException(ErrorCode.E0707, app.getNode(transition).getName()); - } - //ignore validated one - if (traversed.get(app.getNode(transition).getName()) == VisitStatus.VISITED) { - continue; - } - - traversed.put(app.getNode(transition).getName(), VisitStatus.VISITING); - validate(app, app.getNode(transition), traversed); - } - traversed.put(node.getName(), VisitStatus.VISITED); - } - private void addChildElement(Element parent, Namespace ns, String childName, String childValue) { Element child = new Element(childName, ns); child.setText(childValue); http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowValidator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowValidator.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowValidator.java new file mode 100644 index 0000000..24c05af --- /dev/null +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowValidator.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.workflow.lite; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.service.ActionService; +import org.apache.oozie.service.Services; +import org.apache.oozie.util.ParamChecker; +import org.apache.oozie.util.XmlUtils; +import org.apache.oozie.workflow.WorkflowException; +import org.jdom.Element; +import org.jdom.JDOMException; + +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.base.Optional; + +public class LiteWorkflowValidator { + + public void validateWorkflow(LiteWorkflowApp app, boolean validateForkJoin) throws WorkflowException { + NodeDef startNode = app.getNode(StartNodeDef.START); + if (startNode == null) { + throw new WorkflowException(ErrorCode.E0700, "no start node"); // shouldn't happen, but just in case... + } + + ForkJoinCount forkJoinCount = new ForkJoinCount(); + + performBasicValidation(app, startNode, new ArrayDeque<String>(), new HashSet<NodeDef>(), forkJoinCount); + + if (validateForkJoin) { + // don't validate fork/join pairs if the number of forks and joins mismatch + if (forkJoinCount.forks != forkJoinCount.joins) { + throw new WorkflowException(ErrorCode.E0730); + } + + validateForkJoin(app, + startNode, + null, + null, + true, + new ArrayDeque<String>(), + new HashMap<String, String>(), + new HashMap<String, Optional<String>>()); + } + } + + /** + * Basic recursive validation of the workflow: + * - it is acyclic, no loops + * - names of the actions follow a specific pattern + * - all nodes have valid transitions + * - it only has supported action nodes + * - there is no node that points to itself + * - counts fork/join nodes + * + * @param app The WorkflowApp + * @param node Current node we're checking + * @param path The list of nodes that we've visited so far in this call chain + * @param checkedNodes The list of nodes that we've already checked. For example, if it's a decision node, then the we + * don't have to re-walk the entire path because it indicates that it've been done before on a separate path + * @param forkJoinCount Number of fork and join nodes + * @throws WorkflowException If there is any of the constraints described above is violated + */ + private void performBasicValidation(LiteWorkflowApp app, NodeDef node, Deque<String> path, Set<NodeDef> checkedNodes, + ForkJoinCount forkJoinCount) throws WorkflowException { + String nodeName = node.getName(); + + checkActionName(node); + if (node instanceof ActionNodeDef) { + checkActionNode(node); + } else if (node instanceof ForkNodeDef) { + forkJoinCount.forks++; + } else if (node instanceof JoinNodeDef) { + forkJoinCount.joins++; + } + checkCycle(path, nodeName); + + path.addLast(nodeName); + + List<String> transitions = node.getTransitions(); + // Get all transitions and walk the workflow recursively + if (!transitions.isEmpty()) { + for (final String t : transitions) { + NodeDef transitionNode = app.getNode(t); + if (transitionNode == null) { + throw new WorkflowException(ErrorCode.E0708, node.getName(), t); + } + + if (!checkedNodes.contains(transitionNode)) { + performBasicValidation(app, transitionNode, path, checkedNodes, forkJoinCount); + checkedNodes.add(transitionNode); + } + } + } + + path.remove(nodeName); + } + + /** + * This method recursively validates two things: + * - fork/join methods are properly paired + * - there are no multiple "okTo" paths to a given node + * + * Important: this method assumes that the workflow is not acyclic - therefore this must run after performBasicValidation() + * + * @param app The WorkflowApp + * @param node Current node we're checking + * @param currentFork Current fork node (null if we are not under a fork path) + * @param topDecisionParent The top (eldest) decision node along the path to this node, or null if there isn't one + * @param okPath false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has + * already been visited at least once before + * @param forkJoins Map that contains a mapping of fork-join node pairs. + * @param nodeAndDecisionParents Map that contains a mapping of nodes and their eldest decision node + * @throws WorkflowException If there is any of the constraints described above is violated + */ + private void validateForkJoin(LiteWorkflowApp app, NodeDef node, NodeDef currentFork, String topDecisionParent, + boolean okPath, Deque<String> path, Map<String, String> forkJoins, + Map<String, Optional<String>> nodeAndDecisionParents) throws WorkflowException { + final String nodeName = node.getName(); + + path.addLast(nodeName); + + /* If we're walking an "okTo" path and the nodes are not Kill/Join/End, we have to make sure that only a single + * "okTo" path exists to the current node. + * + * The "topDecisionParent" represents the eldest decision in the chain that we've gone through. For example, let's assume + * that D1, D2, D3 are decision nodes and A is an action node. + * + * D1-->D2-->D3---> ... (rest of the WF) + * | | | + * | | | + * | | +----> +---+ + * | +---------> | A | + * +-------------> +---+ + * + * In this case, there are three "okTo" paths to "A" but it's still a valid workflow because the eldest decision node + * is D1 and during every run, there is only one possible execution path that leads to A (D1->A, D1->D2->A or + * (D1->D2->D3->A). In the code, if we encounter a decision node and we already have one, we don't update it. If it's null + * then we set it to the current decision node we're under. + * + * If the "current" and "top" parents are null, it means that we reached the node from two separate "okTo" paths, which is + * not acceptable. + * + * Also, if we have two distinct top decision parents it means that the node is reachable from two decision paths which + * are not "chained" (like in the example). + * + * It's worth noting that the last two examples can only occur in case of fork-join when we start to execute at least + * two separate paths in parallel. Without fork-join, multiple parents or two null parents would mean that there is a loop + * in the workflow but that should not happen since it has been validated. + */ + if (okPath && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) { + // using Optional here so we can distinguish between "non-visited" and "visited - no parent" state. + Optional<String> decisionParentOpt = nodeAndDecisionParents.get(nodeName); + if (decisionParentOpt == null) { + nodeAndDecisionParents.put(node.getName(), Optional.fromNullable(topDecisionParent)); + } else { + String decisionParent = decisionParentOpt.isPresent() ? decisionParentOpt.get() : null; + + if ((decisionParent == null && topDecisionParent == null) || !Objects.equal(decisionParent, topDecisionParent)) { + throw new WorkflowException(ErrorCode.E0743, nodeName); + } + } + } + + /* Fork-Join validation logic: + * + * At each Fork node, we recurse to every possible paths, changing the "currentFork" variable to the Fork node. We stop + * walking as soon as we encounter a Join node. At the Join node, we update the forkJoin mapping, which maintains + * the relationship between every fork-join pair (actually it's join->fork mapping). We check whether the join->fork + * mapping already contains another Fork node, which means that the Join is reachable from at least two distinct + * Fork nodes, so we terminate the validation. + * + * From the Join node, we don't recurse further. Therefore, all recursive calls return back to the point where we called + * validateForkJoin() from the Fork node in question. + * + * At this point, we have to check how many different Join nodes we've found at each different paths. We collect them to + * a set, then we make sure that we have only a single Join node for all Fork paths. Otherwise the workflow is broken. + * + * If we have only a single Join, then we get the transition node from the Join and go on with the recursive validation - + * this time we use the original "currentFork" variable that we have on the stack. With this approach, nested + * Fork-Joins are handled correctly. + */ + if (node instanceof ForkNodeDef) { + final List<String> transitions = node.getTransitions(); + + checkForkTransitions(app, transitions, node); + + for (String t : transitions) { + NodeDef transition = app.getNode(t); + validateForkJoin(app, transition, node, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents); + } + + // get the Join node for this ForkNode & validate it (we must have only one) + Set<String> joins = new HashSet<String>(); + collectJoins(app, forkJoins, nodeName, joins); + checkJoins(joins, nodeName); + + List<String> joinTransitions = app.getNode(joins.iterator().next()).getTransitions(); + NodeDef next = app.getNode(joinTransitions.get(0)); + + validateForkJoin(app, next, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents); + } else if (node instanceof JoinNodeDef) { + if (currentFork == null) { + throw new WorkflowException(ErrorCode.E0742, node.getName()); + } + + // join --> fork mapping + String forkNode = forkJoins.get(nodeName); + if (forkNode == null) { + forkJoins.put(nodeName, currentFork.getName()); + } else if (!forkNode.equals(currentFork.getName())) { + throw new WorkflowException(ErrorCode.E0758, node.getName(), forkNode + "," + currentFork); + } + } else if (node instanceof DecisionNodeDef) { + List<String> transitions = node.getTransitions(); + + // see explanation above - if we already have a topDecisionParent, we don't update it + String parentDecisionNode = topDecisionParent; + if (parentDecisionNode == null) { + parentDecisionNode = nodeName; + } + + for (String t : transitions) { + NodeDef transition = app.getNode(t); + validateForkJoin(app, transition, currentFork, parentDecisionNode, okPath, path, forkJoins, + nodeAndDecisionParents); + } + } else if (node instanceof KillNodeDef) { + // no op + } else if (node instanceof EndNodeDef) { + // We can't end the WF if we're on a Fork path. From the "path" deque, we remove the last node (which + // is the current "End") and look at last node again so we know where we came from + if (currentFork != null) { + path.removeLast(); + String previous = path.peekLast(); + throw new WorkflowException(ErrorCode.E0737, previous, node.getName()); + } + } else if (node instanceof ActionNodeDef) { + String transition = node.getTransitions().get(0); // "ok to" transition + NodeDef okNode = app.getNode(transition); + validateForkJoin(app, okNode, currentFork, topDecisionParent, true, path, forkJoins, nodeAndDecisionParents); + + transition = node.getTransitions().get(1); // "error to" transition + NodeDef errorNode = app.getNode(transition); + validateForkJoin(app, errorNode, currentFork, topDecisionParent, false, path, forkJoins, nodeAndDecisionParents); + } else if (node instanceof StartNodeDef) { + String transition = node.getTransitions().get(0); // start always has only 1 transition + NodeDef tranNode = app.getNode(transition); + validateForkJoin(app, tranNode, currentFork, topDecisionParent, okPath, path, forkJoins, nodeAndDecisionParents); + } else { + throw new WorkflowException(ErrorCode.E0740, node.getClass()); + } + + path.remove(nodeName); + } + + private void checkActionName(NodeDef node) throws WorkflowException { + if (!(node instanceof StartNodeDef)) { + try { + ParamChecker.validateActionName(node.getName()); + } catch (IllegalArgumentException ex) { + throw new WorkflowException(ErrorCode.E0724, ex.getMessage()); + } + } + } + + private void checkActionNode(NodeDef node) throws WorkflowException { + try { + Element action = XmlUtils.parseXml(node.getConf()); + ActionService actionService = Services.get().get(ActionService.class); + boolean supportedAction = actionService.hasActionType(action.getName()); + if (!supportedAction) { + throw new WorkflowException(ErrorCode.E0723, node.getName(), action.getName()); + } + } catch (JDOMException ex) { + throw new WorkflowException(ErrorCode.E0700, "JDOMException: " + ex.getMessage()); + } + } + + private void checkCycle(Deque<String> path, String nodeName) throws WorkflowException { + if (path.contains(nodeName)) { + path.addLast(nodeName); + throw new WorkflowException(ErrorCode.E0707, nodeName, Joiner.on("->").join(path)); + } + } + + // Check that a fork doesn't go to the same node more than once + private void checkForkTransitions(LiteWorkflowApp app, List<String> transitionsList, NodeDef node) throws WorkflowException { + for (final String t : transitionsList) { + NodeDef aNode = app.getNode(t); + // Now we have to figure out which node is the problem and what type of node they are (join and kill are ok) + if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof KillNodeDef)) { + int count = CollectionUtils.cardinality(t, transitionsList); + + if (count > 1) { + throw new WorkflowException(ErrorCode.E0744, node.getName(), t); + } + } + } + } + + private void collectJoins(LiteWorkflowApp app, Map<String, String> forkJoinPairs, String nodeName, Set<String> joins) { + for (Entry<String, String> entry : forkJoinPairs.entrySet()) { + if (entry.getValue().equals(nodeName)) { + joins.add(app.getNode(entry.getKey()).getName()); + } + } + } + + private void checkJoins(Set<String> joinNodes, String forkName) throws WorkflowException { + if (joinNodes.size() == 0) { + throw new WorkflowException(ErrorCode.E0733, forkName); + } + + if (joinNodes.size() > 1) { + throw new WorkflowException(ErrorCode.E0757, forkName, Joiner.on(",").join(joinNodes)); + } + } + + // Tiny utility class where we keep track of how many fork and join nodes we have found + private class ForkJoinCount { + int forks = 0; + int joins = 0; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java index 3c893d0..47ff8ca 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java @@ -215,7 +215,7 @@ public class TestSubmitXCommand extends XDataTestCase { fail("Should have gotten E0707 because the XML has a loop"); } catch (CommandException ce) { assertEquals(ErrorCode.E0707, ce.getErrorCode()); - assertEquals("E0707: Loop detected at parsing, node [a]", ce.getMessage()); + assertEquals("E0707: Loop detected at parsing, node [a], path [:start:->a->c->a]", ce.getMessage()); } conf = new XConfiguration(); http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java b/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java index 9002b6c..9e439b4 100644 --- a/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java +++ b/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java @@ -20,27 +20,20 @@ package org.apache.oozie.workflow.lite; import java.io.StringReader; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.oozie.service.ActionService; +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.ErrorCode; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.service.SchemaService; import org.apache.oozie.service.Services; -import org.apache.oozie.service.TestLiteWorkflowAppService; +import org.apache.oozie.test.XTestCase; +import org.apache.oozie.util.IOUtils; import org.apache.oozie.workflow.WorkflowException; import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler; import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestDecisionNodeHandler; -import org.apache.oozie.test.XTestCase; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.action.hadoop.DistcpActionExecutor; -import org.apache.oozie.action.hadoop.HiveActionExecutor; -import org.apache.hadoop.conf.Configuration; public class TestLiteWorkflowAppParser extends XTestCase { public static String dummyConf = "<java></java>"; @@ -399,7 +392,7 @@ public class TestLiteWorkflowAppParser extends XTestCase { // No default NN is set try { - LiteWorkflowApp app = parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-no-namenode.xml", -1), + parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-no-namenode.xml", -1), new Configuration()); fail(); } catch (WorkflowException e) { @@ -501,7 +494,7 @@ public class TestLiteWorkflowAppParser extends XTestCase { // No default NN is set try { - LiteWorkflowApp app = parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-no-jobtracker.xml", -1), + parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-no-jobtracker.xml", -1), new Configuration()); fail(); } catch (WorkflowException e) { @@ -780,8 +773,7 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); + } catch (WorkflowException we) { assertEquals(ErrorCode.E0737, we.getErrorCode()); // Make sure the message contains the nodes involved in the invalid transition to end assertTrue(we.getMessage().contains("node [three]")); @@ -826,8 +818,7 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); + } catch (WorkflowException we) { assertEquals(ErrorCode.E0742, we.getErrorCode()); assertTrue(we.getMessage().contains("[j2]")); } @@ -861,13 +852,11 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); + } catch (WorkflowException we) { assertEquals(ErrorCode.E0743, we.getErrorCode()); // Make sure the message contains the node involved in the invalid transition assertTrue(we.getMessage().contains("three")); } - } /* @@ -1117,8 +1106,7 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); + } catch (WorkflowException we) { assertEquals(ErrorCode.E0743, we.getErrorCode()); // Make sure the message contains the node involved in the invalid transition assertTrue(we.getMessage().contains("three")); @@ -1155,8 +1143,7 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); + } catch (WorkflowException we) { assertEquals(ErrorCode.E0737, we.getErrorCode()); // Make sure the message contains the nodes involved in the invalid transition to end assertTrue(we.getMessage().contains("node [two]")); @@ -1270,8 +1257,7 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); + } catch (WorkflowException we) { assertEquals(ErrorCode.E0743, we.getErrorCode()); // Make sure the message contains the node involved in the invalid transition assertTrue(we.getMessage().contains("four")); @@ -1314,8 +1300,7 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); + } catch (WorkflowException we) { assertEquals(ErrorCode.E0743, we.getErrorCode()); // Make sure the message contains the node involved in the invalid transition assertTrue(we.getMessage().contains("four")); @@ -1391,12 +1376,10 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); - assertEquals(ErrorCode.E0732, we.getErrorCode()); - assertTrue(we.getMessage().contains("Fork [f]")); - assertTrue(we.getMessage().contains("Join [j1]") && we.getMessage().contains("been [j2]") - || we.getMessage().contains("Join [j2]") && we.getMessage().contains("been [j1]")); + } catch (WorkflowException we) { + assertEquals(ErrorCode.E0757, we.getErrorCode()); + assertTrue(we.getMessage().contains("Fork node [f]")); + assertTrue(we.getMessage().contains("[j2,j1]")); } } @@ -1425,29 +1408,54 @@ public class TestLiteWorkflowAppParser extends XTestCase { try { invokeForkJoin(parser, def); fail("Expected to catch an exception but did not encounter any"); - } catch (Exception ex) { - WorkflowException we = (WorkflowException) ex.getCause(); + } catch (WorkflowException we) { assertEquals(ErrorCode.E0744, we.getErrorCode()); assertTrue(we.getMessage().contains("fork, [f],")); assertTrue(we.getMessage().contains("node, [two]")); } } - // Invoke private validateForkJoin method using Reflection API - private void invokeForkJoin(LiteWorkflowAppParser parser, LiteWorkflowApp def) throws Exception { - Class<? extends LiteWorkflowAppParser> c = parser.getClass(); - Class<?> d = Class.forName("org.apache.oozie.workflow.lite.LiteWorkflowAppParser$VisitStatus"); - Field f = d.getField("VISITING"); - Map traversed = new HashMap(); - traversed.put(def.getNode(StartNodeDef.START).getName(), f); - Method validate = c.getDeclaredMethod("validate", LiteWorkflowApp.class, NodeDef.class, Map.class); - validate.setAccessible(true); - // invoke validate method to populate the fork and join list - validate.invoke(parser, def, def.getNode(StartNodeDef.START), traversed); - Method validateForkJoin = c.getDeclaredMethod("validateForkJoin", LiteWorkflowApp.class); - validateForkJoin.setAccessible(true); - // invoke validateForkJoin - validateForkJoin.invoke(parser, def); + @SuppressWarnings("deprecation") + public void testForkJoinValidationTime() throws Exception { + final LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null, + LiteWorkflowStoreService.LiteControlNodeHandler.class, + LiteWorkflowStoreService.LiteDecisionHandler.class, + LiteWorkflowStoreService.LiteActionHandler.class); + + final LiteWorkflowApp app = parser.validateAndParse(IOUtils.getResourceAsReader("wf-long.xml", -1), + new Configuration()); + + final AtomicBoolean failure = new AtomicBoolean(false); + final AtomicBoolean finished = new AtomicBoolean(false); + + Runnable r = new Runnable() { + @Override + public void run() { + try { + invokeForkJoin(parser, app); + finished.set(true); + } catch (Exception e) { + e.printStackTrace(); + failure.set(true); + } + } + }; + + Thread t = new Thread(r); + t.start(); + t.join((long) (2000 * XTestCase.WAITFOR_RATIO)); + + if (!finished.get()) { + t.stop(); // don't let the validation keep running in the background which causes high CPU load + fail("Workflow validation did not finish in time"); + } + + assertFalse("Workflow validation failed", failure.get()); + } + + private void invokeForkJoin(LiteWorkflowAppParser parser, LiteWorkflowApp def) throws WorkflowException { + LiteWorkflowValidator validator = new LiteWorkflowValidator(); + validator.validateWorkflow(def, true); } // If Xerces 2.10.0 is not explicitly listed as a dependency in the poms, then Java will revert to an older version that has