OOZIE-1978 Forkjoin validation code is ridiculously slow in some cases (pbacsko 
via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8e9b9042
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8e9b9042
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8e9b9042

Branch: refs/heads/master
Commit: 8e9b9042b3270dc5ff975c44a5c977fcc41250e4
Parents: 5c89163
Author: Robert Kanter <rkan...@cloudera.com>
Authored: Wed Sep 21 17:44:21 2016 -0700
Committer: Robert Kanter <rkan...@cloudera.com>
Committed: Wed Sep 21 17:44:21 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/oozie/ErrorCode.java   |    4 +-
 .../org/apache/oozie/service/ActionService.java |    5 +
 .../workflow/lite/LiteWorkflowAppParser.java    |  364 +----
 .../workflow/lite/LiteWorkflowValidator.java    |  351 +++++
 .../oozie/command/wf/TestSubmitXCommand.java    |    2 +-
 .../lite/TestLiteWorkflowAppParser.java         |  112 +-
 core/src/test/resources/wf-long.xml             | 1456 ++++++++++++++++++
 release-log.txt                                 |    1 +
 8 files changed, 1914 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java 
b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 2907ca2..8fc3835 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -113,7 +113,7 @@ public enum ErrorCode {
     E0704(XLog.STD, "Definition already complete, application [{0}]"),
     E0705(XLog.STD, "Nnode already defined, node [{0}]"),
     E0706(XLog.STD, "Node cannot transition to itself node [{0}]"),
-    E0707(XLog.STD, "Loop detected at parsing, node [{0}]"),
+    E0707(XLog.STD, "Loop detected at parsing, node [{0}], path [{1}]"),
     E0708(XLog.STD, "Invalid transition, node [{0}] transition [{1}]"),
     E0709(XLog.STD, "Loop detected at runtime, node [{0}]"),
     E0710(XLog.STD, "Could not read the workflow definition, {0}"),
@@ -153,6 +153,8 @@ public enum ErrorCode {
     E0744(XLog.STD, "A fork, [{0}], is not allowed to have multiple 
transitions to the same node, [{1}]"),
     E0755(XLog.STD, "Workflow Job Rerun Error: {0}"),
     E0756(XLog.STD, "Exception parsing Kill node message [{0}]"),
+    E0757(XLog.STD, "Fork node [{0}] has multiple joins: [{1}]"),
+    E0758(XLog.STD, "Join node [{0}] has multiple forks: [{1}]"),
 
     E0800(XLog.STD, "Action it is not running its in [{1}] state, action 
[{0}]"),
     E0801(XLog.STD, "Workflow already running, workflow [{0}]"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/main/java/org/apache/oozie/service/ActionService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/ActionService.java 
b/core/src/main/java/org/apache/oozie/service/ActionService.java
index becc69b..a739a19 100644
--- a/core/src/main/java/org/apache/oozie/service/ActionService.java
+++ b/core/src/main/java/org/apache/oozie/service/ActionService.java
@@ -138,6 +138,11 @@ public class ActionService implements Service, 
Instrumentable {
         return (executorClass != null) ? (ActionExecutor) 
ReflectionUtils.newInstance(executorClass, null) : null;
     }
 
+    public boolean hasActionType(String actionType) {
+        ParamChecker.notEmpty(actionType, "actionType");
+        return executors.containsKey(actionType);
+    }
+
     Set<String> getActionTypes() {
         return executors.keySet();
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java 
b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
index 0541634..a74e5c7 100644
--- 
a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
+++ 
b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
@@ -18,57 +18,51 @@
 
 package org.apache.oozie.workflow.lite;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.Validator;
+
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
-import org.apache.oozie.action.hadoop.FsActionExecutor;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.hadoop.FsActionExecutor;
 import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
+import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.Services;
 import org.apache.oozie.util.ELUtils;
 import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.ParameterVerifier;
 import org.apache.oozie.util.ParameterVerifierException;
 import org.apache.oozie.util.WritableUtils;
-import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.workflow.WorkflowException;
-import org.apache.oozie.action.ActionExecutor;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.ActionService;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.jdom.Element;
 import org.jdom.JDOMException;
 import org.jdom.Namespace;
 import org.xml.sax.SAXException;
 
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.Validator;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.zip.*;
-
 /**
  * Class to parse and validate workflow xml
  */
@@ -124,29 +118,6 @@ public class LiteWorkflowAppParser {
     private Class<? extends DecisionNodeHandler> decisionHandlerClass;
     private Class<? extends ActionNodeHandler> actionHandlerClass;
 
-    private static enum VisitStatus {
-        VISITING, VISITED
-    }
-
-    /**
-     * We use this to store a node name and its top (eldest) decision parent 
node name for the forkjoin validation
-     */
-    class NodeAndTopDecisionParent {
-        String node;
-        String topDecisionParent;
-
-        public NodeAndTopDecisionParent(String node, String topDecisionParent) 
{
-            this.node = node;
-            this.topDecisionParent = topDecisionParent;
-        }
-    }
-
-    private List<String> forkList = new ArrayList<String>();
-    private List<String> joinList = new ArrayList<String>();
-    private StartNodeDef startNode;
-    private List<NodeAndTopDecisionParent> visitedOkNodes = new 
ArrayList<NodeAndTopDecisionParent>();
-    private List<String> visitedJoinNodes = new ArrayList<String>();
-
     private String defaultNameNode;
     private String defaultJobTracker;
 
@@ -201,14 +172,18 @@ public class LiteWorkflowAppParser {
             Element wfDefElement = XmlUtils.parseXml(strDef);
             ParameterVerifier.verifyParameters(jobConf, wfDefElement);
             LiteWorkflowApp app = parse(strDef, wfDefElement, configDefault, 
jobConf);
-            Map<String, VisitStatus> traversed = new HashMap<String, 
VisitStatus>();
-            traversed.put(app.getNode(StartNodeDef.START).getName(), 
VisitStatus.VISITING);
-            validate(app, app.getNode(StartNodeDef.START), traversed);
-            //Validate whether fork/join are in pair or not
+
+
+            boolean validateForkJoin = false;
+
             if (jobConf.getBoolean(WF_VALIDATE_FORK_JOIN, true)
                     && ConfigurationService.getBoolean(VALIDATE_FORK_JOIN)) {
-                validateForkJoin(app);
+                validateForkJoin = true;
             }
+
+            LiteWorkflowValidator validator = new LiteWorkflowValidator();
+            validator.validateWorkflow(app, validateForkJoin);
+
             return app;
         }
         catch (ParameterVerifierException ex) {
@@ -226,201 +201,6 @@ public class LiteWorkflowAppParser {
     }
 
     /**
-     * Validate whether fork/join are in pair or not
-     * @param app LiteWorkflowApp
-     * @throws WorkflowException
-     */
-    private void validateForkJoin(LiteWorkflowApp app) throws 
WorkflowException {
-        // Make sure the number of forks and joins in wf are equal
-        if (forkList.size() != joinList.size()) {
-            throw new WorkflowException(ErrorCode.E0730);
-        }
-
-        // No need to bother going through all of this if there are no 
fork/join nodes
-        if (!forkList.isEmpty()) {
-            visitedOkNodes.clear();
-            visitedJoinNodes.clear();
-            validateForkJoin(startNode, app, new LinkedList<String>(), new 
LinkedList<String>(), new LinkedList<String>(), true,
-                    null);
-        }
-    }
-
-    /*
-     * Recursively walk through the DAG and make sure that all fork paths are 
valid.
-     * This should be called from validateForkJoin(LiteWorkflowApp app).  It 
assumes that visitedOkNodes and visitedJoinNodes are
-     * both empty ArrayLists on the first call.
-     *
-     * @param node the current node; use the startNode on the first call
-     * @param app the WorkflowApp
-     * @param forkNodes a stack of the current fork nodes
-     * @param joinNodes a stack of the current join nodes
-     * @param path a stack of the current path
-     * @param okTo false if node (or an ancestor of node) was gotten to via an 
"error to" transition or via a join node that has
-     * already been visited at least once before
-     * @param topDecisionParent The top (eldest) decision node along the path 
to this node, or null if there isn't one
-     * @throws WorkflowException
-     */
-    private void validateForkJoin(NodeDef node, LiteWorkflowApp app, 
Deque<String> forkNodes, Deque<String> joinNodes,
-            Deque<String> path, boolean okTo, String topDecisionParent) throws 
WorkflowException {
-        if (path.contains(node.getName())) {
-            // cycle
-            throw new WorkflowException(ErrorCode.E0741, node.getName(), 
Arrays.toString(path.toArray()));
-        }
-        path.push(node.getName());
-
-        // Make sure that we're not revisiting a node (that's not a Kill, 
Join, or End type) that's been visited before from an
-        // "ok to" transition; if its from an "error to" transition, then its 
okay to visit it multiple times.  Also, because we
-        // traverse through join nodes multiple times, we have to make sure 
not to throw an exception here when we're really just
-        // re-walking the same execution path (this is why we need the 
visitedJoinNodes list used later)
-        if (okTo && !(node instanceof KillNodeDef) && !(node instanceof 
JoinNodeDef) && !(node instanceof EndNodeDef)) {
-            NodeAndTopDecisionParent natdp = 
findInVisitedOkNodes(node.getName());
-            if (natdp != null) {
-                // However, if we've visited the node and it's under a 
decision node, we may be seeing it again and it's only
-                // illegal if that decision node is not the same as what we're 
seeing now (because during execution we only go
-                // down one path of the decision node, so while we're seeing 
the node multiple times here, during runtime it will
-                // only be executed once).  Also, this decision node should be 
the top (eldest) decision node.  As null indicates
-                // that there isn't a decision node, when this happens they 
must both be null to be valid.  Here is a good example
-                // to visualize a node ("actionX") that has three "ok to" 
paths to it, but should still be a valid workflow (it may
-                // be easier to see if you draw it):
-                    // decisionA --> {actionX, decisionB}
-                    // decisionB --> {actionX, actionY}
-                    // actionY   --> {actionX}
-                // And, if we visit this node twice under the same decision 
node in an invalid way, the path cycle checking code
-                // will catch it, so we don't have to worry about that here.
-                if ((natdp.topDecisionParent == null && topDecisionParent == 
null)
-                     || (natdp.topDecisionParent == null && topDecisionParent 
!= null)
-                     || (natdp.topDecisionParent != null && topDecisionParent 
== null)
-                     || !natdp.topDecisionParent.equals(topDecisionParent)) {
-                    // If we get here, then we've seen this node before from 
an "ok to" transition but they don't have the same
-                    // decision node top parent, which means that this node 
will be executed twice, which is illegal
-                    throw new WorkflowException(ErrorCode.E0743, 
node.getName());
-                }
-            }
-            else {
-                // If we haven't transitioned to this node before, add it and 
its top decision parent node
-                visitedOkNodes.add(new 
NodeAndTopDecisionParent(node.getName(), topDecisionParent));
-            }
-        }
-
-        if (node instanceof StartNodeDef) {
-            String transition = node.getTransitions().get(0);   // start 
always has only 1 transition
-            NodeDef tranNode = app.getNode(transition);
-            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, 
topDecisionParent);
-        }
-        else if (node instanceof ActionNodeDef) {
-            String transition = node.getTransitions().get(0);   // "ok to" 
transition
-            NodeDef tranNode = app.getNode(transition);
-            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo, 
topDecisionParent);  // propogate okTo
-            transition = node.getTransitions().get(1);          // "error to" 
transition
-            tranNode = app.getNode(transition);
-            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false, 
topDecisionParent); // use false
-        }
-        else if (node instanceof DecisionNodeDef) {
-            for(String transition : (new 
HashSet<String>(node.getTransitions()))) {
-                NodeDef tranNode = app.getNode(transition);
-                // if there currently isn't a topDecisionParent (i.e. null), 
then use this node instead of propagating null
-                String parentDecisionNode = topDecisionParent;
-                if (parentDecisionNode == null) {
-                    parentDecisionNode = node.getName();
-                }
-                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
okTo, parentDecisionNode);
-            }
-        }
-        else if (node instanceof ForkNodeDef) {
-            forkNodes.push(node.getName());
-            List<String> transitionsList = node.getTransitions();
-            HashSet<String> transitionsSet = new 
HashSet<String>(transitionsList);
-            // Check that a fork doesn't go to the same node more than once
-            if (!transitionsList.isEmpty() && transitionsList.size() != 
transitionsSet.size()) {
-                // Now we have to figure out which node is the problem and 
what type of node they are (join and kill are ok)
-                for (int i = 0; i < transitionsList.size(); i++) {
-                    String a = transitionsList.get(i);
-                    NodeDef aNode = app.getNode(a);
-                    if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof 
KillNodeDef)) {
-                        for (int k = i+1; k < transitionsList.size(); k++) {
-                            String b = transitionsList.get(k);
-                            if (a.equals(b)) {
-                                throw new WorkflowException(ErrorCode.E0744, 
node.getName(), a);
-                            }
-                        }
-                    }
-                }
-            }
-            for(String transition : transitionsSet) {
-                NodeDef tranNode = app.getNode(transition);
-                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
okTo, topDecisionParent);
-            }
-            forkNodes.pop();
-            if (!joinNodes.isEmpty()) {
-                joinNodes.pop();
-            }
-        }
-        else if (node instanceof JoinNodeDef) {
-            if (forkNodes.isEmpty()) {
-                // no fork for join to match with
-                throw new WorkflowException(ErrorCode.E0742, node.getName());
-            }
-            if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || 
!joinNodes.peek().equals(node.getName()))) {
-                joinNodes.push(node.getName());
-            }
-            if (!joinNodes.peek().equals(node.getName())) {
-                // join doesn't match fork
-                throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), 
node.getName(), joinNodes.peek());
-            }
-            joinNodes.pop();
-            String currentForkNode = forkNodes.pop();
-            String transition = node.getTransitions().get(0);   // join always 
has only 1 transition
-            NodeDef tranNode = app.getNode(transition);
-            // If we're already under a situation where okTo is false, use 
false (propogate it)
-            // Or if we've already visited this join node, use false (because 
we've already traversed this path before and we don't
-            // want to throw an exception from the check against 
visitedOkNodes)
-            if (!okTo || visitedJoinNodes.contains(node.getName())) {
-                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
false, topDecisionParent);
-            // Else, use true because this is either the first time we've gone 
through this join node or okTo was already false
-            } else {
-                visitedJoinNodes.add(node.getName());
-                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, 
true, topDecisionParent);
-            }
-            forkNodes.push(currentForkNode);
-            joinNodes.push(node.getName());
-        }
-        else if (node instanceof KillNodeDef) {
-            // do nothing
-        }
-        else if (node instanceof EndNodeDef) {
-            if (!forkNodes.isEmpty()) {
-                path.pop();     // = node
-                String parent = path.peek();
-                // can't go to an end node in a fork
-                throw new WorkflowException(ErrorCode.E0737, parent, 
node.getName());
-            }
-        }
-        else {
-            // invalid node type (shouldn't happen)
-            throw new WorkflowException(ErrorCode.E0740, node.getName());
-        }
-        path.pop();
-    }
-
-    /**
-     * Return a {@link NodeAndTopDecisionParent} whose {@link 
NodeAndTopDecisionParent#node} is equal to the passed in name, or null
-     * if it isn't in the {@link LiteWorkflowAppParser#visitedOkNodes} list.
-     *
-     * @param name The name to search for
-     * @return a NodeAndTopDecisionParent or null
-     */
-    private NodeAndTopDecisionParent findInVisitedOkNodes(String name) {
-        NodeAndTopDecisionParent natdp = null;
-        for (NodeAndTopDecisionParent v : visitedOkNodes) {
-            if (v.node.equals(name)) {
-                natdp = v;
-                break;
-            }
-        }
-        return natdp;
-    }
-
-    /**
      * Parse xml to {@link LiteWorkflowApp}
      *
      * @param strDef
@@ -573,76 +353,6 @@ public class LiteWorkflowAppParser {
         return Base64.encodeBase64String(baos.toByteArray());
     }
 
-    /**
-     * Validate workflow xml
-     *
-     * @param app
-     * @param node
-     * @param traversed
-     * @throws WorkflowException
-     */
-    private void validate(LiteWorkflowApp app, NodeDef node, Map<String, 
VisitStatus> traversed) throws WorkflowException {
-        if (node instanceof StartNodeDef) {
-            startNode = (StartNodeDef) node;
-        }
-        else {
-            try {
-                ParamChecker.validateActionName(node.getName());
-            }
-            catch (IllegalArgumentException ex) {
-                throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
-            }
-        }
-        if (node instanceof ActionNodeDef) {
-            try {
-                Element action = XmlUtils.parseXml(node.getConf());
-                boolean supportedAction = 
Services.get().get(ActionService.class).getExecutor(action.getName()) != null;
-                if (!supportedAction) {
-                    throw new WorkflowException(ErrorCode.E0723, 
node.getName(), action.getName());
-                }
-            }
-            catch (JDOMException ex) {
-                throw new RuntimeException("It should never happen, " + 
ex.getMessage(), ex);
-            }
-        }
-
-        if(node instanceof ForkNodeDef){
-            forkList.add(node.getName());
-        }
-
-        if(node instanceof JoinNodeDef){
-            joinList.add(node.getName());
-        }
-
-        if (node instanceof EndNodeDef) {
-            traversed.put(node.getName(), VisitStatus.VISITED);
-            return;
-        }
-        if (node instanceof KillNodeDef) {
-            traversed.put(node.getName(), VisitStatus.VISITED);
-            return;
-        }
-        for (String transition : node.getTransitions()) {
-
-            if (app.getNode(transition) == null) {
-                throw new WorkflowException(ErrorCode.E0708, node.getName(), 
transition);
-            }
-
-            //check if it is a cycle
-            if (traversed.get(app.getNode(transition).getName()) == 
VisitStatus.VISITING) {
-                throw new WorkflowException(ErrorCode.E0707, 
app.getNode(transition).getName());
-            }
-            //ignore validated one
-            if (traversed.get(app.getNode(transition).getName()) == 
VisitStatus.VISITED) {
-                continue;
-            }
-
-            traversed.put(app.getNode(transition).getName(), 
VisitStatus.VISITING);
-            validate(app, app.getNode(transition), traversed);
-        }
-        traversed.put(node.getName(), VisitStatus.VISITED);
-    }
-
     private void addChildElement(Element parent, Namespace ns, String 
childName, String childValue) {
         Element child = new Element(childName, ns);
         child.setText(childValue);

http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowValidator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowValidator.java 
b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowValidator.java
new file mode 100644
index 0000000..24c05af
--- /dev/null
+++ 
b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowValidator.java
@@ -0,0 +1,351 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.workflow.lite;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.ActionService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XmlUtils;
+import org.apache.oozie.workflow.WorkflowException;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+
+public class LiteWorkflowValidator {
+
+    public void validateWorkflow(LiteWorkflowApp app, boolean 
validateForkJoin) throws WorkflowException {
+        NodeDef startNode = app.getNode(StartNodeDef.START);
+        if (startNode == null) {
+            throw new WorkflowException(ErrorCode.E0700, "no start node"); // 
shouldn't happen, but just in case...
+        }
+
+        ForkJoinCount forkJoinCount = new ForkJoinCount();
+
+        performBasicValidation(app, startNode, new ArrayDeque<String>(), new 
HashSet<NodeDef>(), forkJoinCount);
+
+        if (validateForkJoin) {
+            // don't validate fork/join pairs if the number of forks and joins 
mismatch
+            if (forkJoinCount.forks != forkJoinCount.joins) {
+                throw new WorkflowException(ErrorCode.E0730);
+            }
+
+            validateForkJoin(app,
+                    startNode,
+                    null,
+                    null,
+                    true,
+                    new ArrayDeque<String>(),
+                    new HashMap<String, String>(),
+                    new HashMap<String, Optional<String>>());
+        }
+    }
+
+    /**
+     * Basic recursive validation of the workflow:
+     * - it is acyclic, no loops
+     * - names of the actions follow a specific pattern
+     * - all nodes have valid transitions
+     * - it only has supported action nodes
+     * - there is no node that points to itself
+     * - counts fork/join nodes
+     *
+     * @param app The WorkflowApp
+     * @param node Current node we're checking
+     * @param path The list of nodes that we've visited so far in this call 
chain
+     * @param checkedNodes The list of nodes that we've already checked. For 
example, if it's a decision node, then the we
+     * don't have to re-walk the entire path because it indicates that it've 
been done before on a separate path
+     * @param forkJoinCount Number of fork and join nodes
+     * @throws WorkflowException If there is any of the constraints described 
above is violated
+     */
+    private void performBasicValidation(LiteWorkflowApp app, NodeDef node, 
Deque<String> path, Set<NodeDef> checkedNodes,
+            ForkJoinCount forkJoinCount) throws WorkflowException {
+        String nodeName = node.getName();
+
+        checkActionName(node);
+        if (node instanceof ActionNodeDef) {
+            checkActionNode(node);
+        } else if (node instanceof ForkNodeDef) {
+            forkJoinCount.forks++;
+        } else if (node instanceof JoinNodeDef) {
+            forkJoinCount.joins++;
+        }
+        checkCycle(path, nodeName);
+
+        path.addLast(nodeName);
+
+        List<String> transitions = node.getTransitions();
+        // Get all transitions and walk the workflow recursively
+        if (!transitions.isEmpty()) {
+            for (final String t : transitions) {
+                NodeDef transitionNode = app.getNode(t);
+                if (transitionNode == null) {
+                    throw new WorkflowException(ErrorCode.E0708, 
node.getName(), t);
+                }
+
+                if (!checkedNodes.contains(transitionNode)) {
+                    performBasicValidation(app, transitionNode, path, 
checkedNodes, forkJoinCount);
+                    checkedNodes.add(transitionNode);
+                }
+            }
+        }
+
+        path.remove(nodeName);
+    }
+
+    /**
+     * This method recursively validates two things:
+     * - fork/join methods are properly paired
+     * - there are no multiple "okTo" paths to a given node
+     *
+     * Important: this method assumes that the workflow is not acyclic - 
therefore this must run after performBasicValidation()
+     *
+     * @param app The WorkflowApp
+     * @param node Current node we're checking
+     * @param currentFork Current fork node (null if we are not under a fork 
path)
+     * @param topDecisionParent The top (eldest) decision node along the path 
to this node, or null if there isn't one
+     * @param okPath false if node (or an ancestor of node) was gotten to via 
an "error to" transition or via a join node that has
+     * already been visited at least once before
+     * @param forkJoins Map that contains a mapping of fork-join node pairs.
+     * @param nodeAndDecisionParents Map that contains a mapping of nodes and 
their eldest decision node
+     * @throws WorkflowException If there is any of the constraints described 
above is violated
+     */
+    private void validateForkJoin(LiteWorkflowApp app, NodeDef node, NodeDef 
currentFork, String topDecisionParent,
+            boolean okPath, Deque<String> path, Map<String, String> forkJoins,
+            Map<String, Optional<String>> nodeAndDecisionParents) throws 
WorkflowException {
+        final String nodeName = node.getName();
+
+        path.addLast(nodeName);
+
+        /* If we're walking an "okTo" path and the nodes are not 
Kill/Join/End, we have to make sure that only a single
+         * "okTo" path exists to the current node.
+         *
+         * The "topDecisionParent" represents the eldest decision in the chain 
that we've gone through. For example, let's assume
+         * that D1, D2, D3 are decision nodes and A is an action node.
+         *
+         * D1-->D2-->D3---> ... (rest of the WF)
+         *  |   |    |
+         *  |   |    |
+         *  |   |    +----> +---+
+         *  |   +---------> | A |
+         *  +-------------> +---+
+         *
+         * In this case, there are three "okTo" paths to "A" but it's still a 
valid workflow because the eldest decision node
+         * is D1 and during every run, there is only one possible execution 
path that leads to A (D1->A, D1->D2->A or
+         * (D1->D2->D3->A). In the code, if we encounter a decision node and 
we already have one, we don't update it. If it's null
+         * then we set it to the current decision node we're under.
+         *
+         * If the "current" and "top" parents are null, it means that we 
reached the node from two separate "okTo" paths, which is
+         * not acceptable.
+         *
+         * Also, if we have two distinct top decision parents it means that 
the node is reachable from two decision paths which
+         * are not "chained" (like in the example).
+         *
+         * It's worth noting that the last two examples can only occur in case 
of fork-join when we start to execute at least
+         * two separate paths in parallel. Without fork-join, multiple parents 
or two null parents would mean that there is a loop
+         * in the workflow but that should not happen since it has been 
validated.
+         */
+        if (okPath && !(node instanceof KillNodeDef) && !(node instanceof 
JoinNodeDef) && !(node instanceof EndNodeDef)) {
+            // using Optional here so we can distinguish between "non-visited" 
and "visited - no parent" state.
+            Optional<String> decisionParentOpt = 
nodeAndDecisionParents.get(nodeName);
+            if (decisionParentOpt == null) {
+                nodeAndDecisionParents.put(node.getName(), 
Optional.fromNullable(topDecisionParent));
+            } else {
+                String decisionParent = decisionParentOpt.isPresent() ? 
decisionParentOpt.get() : null;
+
+                if ((decisionParent == null && topDecisionParent == null) || 
!Objects.equal(decisionParent, topDecisionParent)) {
+                    throw new WorkflowException(ErrorCode.E0743, nodeName);
+                }
+            }
+        }
+
+        /* Fork-Join validation logic:
+         *
+         * At each Fork node, we recurse to every possible paths, changing the 
"currentFork" variable to the Fork node. We stop
+         * walking as soon as we encounter a Join node. At the Join node, we 
update the forkJoin mapping, which maintains
+         * the relationship between every fork-join pair (actually it's 
join->fork mapping). We check whether the join->fork
+         * mapping already contains another Fork node, which means that the 
Join is reachable from at least two distinct
+         * Fork nodes, so we terminate the validation.
+         *
+         * From the Join node, we don't recurse further. Therefore, all 
recursive calls return back to the point where we called
+         * validateForkJoin() from the Fork node in question.
+         *
+         * At this point, we have to check how many different Join nodes we've 
found at each different paths. We collect them to
+         * a set, then we make sure that we have only a single Join node for 
all Fork paths. Otherwise the workflow is broken.
+         *
+         * If we have only a single Join, then we get the transition node from 
the Join and go on with the recursive validation -
+         * this time we use the original "currentFork" variable that we have 
on the stack. With this approach, nested
+         * Fork-Joins are handled correctly.
+         */
+        if (node instanceof ForkNodeDef) {
+            final List<String> transitions = node.getTransitions();
+
+            checkForkTransitions(app, transitions, node);
+
+            for (String t : transitions) {
+                NodeDef transition = app.getNode(t);
+                validateForkJoin(app, transition, node, topDecisionParent, 
okPath, path, forkJoins, nodeAndDecisionParents);
+            }
+
+            // get the Join node for this ForkNode & validate it (we must have 
only one)
+            Set<String> joins = new HashSet<String>();
+            collectJoins(app, forkJoins, nodeName, joins);
+            checkJoins(joins, nodeName);
+
+            List<String> joinTransitions = 
app.getNode(joins.iterator().next()).getTransitions();
+            NodeDef next = app.getNode(joinTransitions.get(0));
+
+            validateForkJoin(app, next, currentFork, topDecisionParent, 
okPath, path, forkJoins, nodeAndDecisionParents);
+        } else if (node instanceof JoinNodeDef) {
+            if (currentFork == null) {
+                throw new WorkflowException(ErrorCode.E0742, node.getName());
+            }
+
+            // join --> fork mapping
+            String forkNode = forkJoins.get(nodeName);
+            if (forkNode == null) {
+                forkJoins.put(nodeName, currentFork.getName());
+            } else if (!forkNode.equals(currentFork.getName())) {
+                throw new WorkflowException(ErrorCode.E0758, node.getName(), 
forkNode + "," + currentFork);
+            }
+        } else if (node instanceof DecisionNodeDef) {
+            List<String> transitions = node.getTransitions();
+
+            // see explanation above - if we already have a topDecisionParent, 
we don't update it
+            String parentDecisionNode = topDecisionParent;
+            if (parentDecisionNode == null) {
+                parentDecisionNode = nodeName;
+            }
+
+            for (String t : transitions) {
+                NodeDef transition = app.getNode(t);
+                validateForkJoin(app, transition, currentFork, 
parentDecisionNode, okPath, path, forkJoins,
+                        nodeAndDecisionParents);
+            }
+        } else if (node instanceof KillNodeDef) {
+            // no op
+        } else if (node instanceof EndNodeDef) {
+            // We can't end the WF if we're on a Fork path. From the "path" 
deque, we remove the last node (which
+            // is the current "End") and look at last node again so we know 
where we came from
+            if (currentFork != null) {
+                path.removeLast();
+                String previous = path.peekLast();
+                throw new WorkflowException(ErrorCode.E0737, previous, 
node.getName());
+            }
+        } else if (node instanceof ActionNodeDef) {
+            String transition = node.getTransitions().get(0);   // "ok to" 
transition
+            NodeDef okNode = app.getNode(transition);
+            validateForkJoin(app, okNode, currentFork, topDecisionParent, 
true, path, forkJoins, nodeAndDecisionParents);
+
+            transition = node.getTransitions().get(1);          // "error to" 
transition
+            NodeDef errorNode = app.getNode(transition);
+            validateForkJoin(app, errorNode, currentFork, topDecisionParent, 
false, path, forkJoins, nodeAndDecisionParents);
+        } else if (node instanceof StartNodeDef) {
+            String transition = node.getTransitions().get(0);   // start 
always has only 1 transition
+            NodeDef tranNode = app.getNode(transition);
+            validateForkJoin(app, tranNode, currentFork, topDecisionParent, 
okPath, path, forkJoins, nodeAndDecisionParents);
+        } else {
+            throw new WorkflowException(ErrorCode.E0740, node.getClass());
+        }
+
+        path.remove(nodeName);
+    }
+
+    private void checkActionName(NodeDef node) throws WorkflowException {
+        if (!(node instanceof StartNodeDef)) {
+            try {
+                ParamChecker.validateActionName(node.getName());
+            } catch (IllegalArgumentException ex) {
+                throw new WorkflowException(ErrorCode.E0724, ex.getMessage());
+            }
+        }
+    }
+
+    private void checkActionNode(NodeDef node) throws WorkflowException {
+        try {
+            Element action = XmlUtils.parseXml(node.getConf());
+            ActionService actionService = 
Services.get().get(ActionService.class);
+            boolean supportedAction = 
actionService.hasActionType(action.getName());
+            if (!supportedAction) {
+                throw new WorkflowException(ErrorCode.E0723, node.getName(), 
action.getName());
+            }
+        } catch (JDOMException ex) {
+            throw new WorkflowException(ErrorCode.E0700, "JDOMException: " + 
ex.getMessage());
+        }
+    }
+
+    private void checkCycle(Deque<String> path, String nodeName) throws 
WorkflowException {
+        if (path.contains(nodeName)) {
+            path.addLast(nodeName);
+            throw new WorkflowException(ErrorCode.E0707, nodeName, 
Joiner.on("->").join(path));
+        }
+    }
+
+    // Check that a fork doesn't go to the same node more than once
+    private void checkForkTransitions(LiteWorkflowApp app, List<String> 
transitionsList, NodeDef node) throws WorkflowException {
+        for (final String t : transitionsList) {
+            NodeDef aNode = app.getNode(t);
+            // Now we have to figure out which node is the problem and what 
type of node they are (join and kill are ok)
+            if (!(aNode instanceof JoinNodeDef) && !(aNode instanceof 
KillNodeDef)) {
+                int count = CollectionUtils.cardinality(t, transitionsList);
+
+                if (count > 1) {
+                    throw new WorkflowException(ErrorCode.E0744, 
node.getName(), t);
+                }
+            }
+        }
+    }
+
+    private void collectJoins(LiteWorkflowApp app, Map<String, String> 
forkJoinPairs, String nodeName, Set<String> joins) {
+        for (Entry<String, String> entry : forkJoinPairs.entrySet()) {
+            if (entry.getValue().equals(nodeName)) {
+                joins.add(app.getNode(entry.getKey()).getName());
+            }
+        }
+    }
+
+    private void checkJoins(Set<String> joinNodes, String forkName) throws 
WorkflowException {
+        if (joinNodes.size() == 0) {
+            throw new WorkflowException(ErrorCode.E0733, forkName);
+        }
+
+        if (joinNodes.size() > 1) {
+            throw new WorkflowException(ErrorCode.E0757, forkName, 
Joiner.on(",").join(joinNodes));
+        }
+    }
+
+    // Tiny utility class where we keep track of how many fork and join nodes 
we have found
+    private class ForkJoinCount {
+        int forks = 0;
+        int joins = 0;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java
index 3c893d0..47ff8ca 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java
@@ -215,7 +215,7 @@ public class TestSubmitXCommand extends XDataTestCase {
             fail("Should have gotten E0707 because the XML has a loop");
         } catch (CommandException ce) {
             assertEquals(ErrorCode.E0707, ce.getErrorCode());
-            assertEquals("E0707: Loop detected at parsing, node [a]", 
ce.getMessage());
+            assertEquals("E0707: Loop detected at parsing, node [a], path 
[:start:->a->c->a]", ce.getMessage());
         }
 
         conf = new XConfiguration();

http://git-wip-us.apache.org/repos/asf/oozie/blob/8e9b9042/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
 
b/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
index 9002b6c..9e439b4 100644
--- 
a/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
+++ 
b/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
@@ -20,27 +20,20 @@ package org.apache.oozie.workflow.lite;
 
 
 import java.io.StringReader;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.oozie.service.ActionService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.TestLiteWorkflowAppService;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.workflow.WorkflowException;
 import 
org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler;
 import 
org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestDecisionNodeHandler;
-import org.apache.oozie.test.XTestCase;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.action.hadoop.DistcpActionExecutor;
-import org.apache.oozie.action.hadoop.HiveActionExecutor;
-import org.apache.hadoop.conf.Configuration;
 
 public class TestLiteWorkflowAppParser extends XTestCase {
     public static String dummyConf = "<java></java>";
@@ -399,7 +392,7 @@ public class TestLiteWorkflowAppParser extends XTestCase {
 
         // No default NN is set
         try {
-            LiteWorkflowApp app = 
parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-no-namenode.xml",
 -1),
+            
parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-no-namenode.xml",
 -1),
                     new Configuration());
             fail();
         } catch (WorkflowException e) {
@@ -501,7 +494,7 @@ public class TestLiteWorkflowAppParser extends XTestCase {
 
         // No default NN is set
         try {
-            LiteWorkflowApp app = 
parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-no-jobtracker.xml",
 -1),
+            
parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-no-jobtracker.xml",
 -1),
                     new Configuration());
             fail();
         } catch (WorkflowException e) {
@@ -780,8 +773,7 @@ public class TestLiteWorkflowAppParser extends XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
+        } catch (WorkflowException we) {
             assertEquals(ErrorCode.E0737, we.getErrorCode());
             // Make sure the message contains the nodes involved in the 
invalid transition to end
             assertTrue(we.getMessage().contains("node [three]"));
@@ -826,8 +818,7 @@ public class TestLiteWorkflowAppParser extends XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
+        } catch (WorkflowException we) {
             assertEquals(ErrorCode.E0742, we.getErrorCode());
             assertTrue(we.getMessage().contains("[j2]"));
         }
@@ -861,13 +852,11 @@ public class TestLiteWorkflowAppParser extends XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
+        } catch (WorkflowException we) {
             assertEquals(ErrorCode.E0743, we.getErrorCode());
             // Make sure the message contains the node involved in the invalid 
transition
             assertTrue(we.getMessage().contains("three"));
         }
-
     }
 
     /*
@@ -1117,8 +1106,7 @@ public class TestLiteWorkflowAppParser extends XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
+        } catch (WorkflowException we) {
             assertEquals(ErrorCode.E0743, we.getErrorCode());
             // Make sure the message contains the node involved in the invalid 
transition
             assertTrue(we.getMessage().contains("three"));
@@ -1155,8 +1143,7 @@ public class TestLiteWorkflowAppParser extends XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
+        } catch (WorkflowException we) {
             assertEquals(ErrorCode.E0737, we.getErrorCode());
             // Make sure the message contains the nodes involved in the 
invalid transition to end
             assertTrue(we.getMessage().contains("node [two]"));
@@ -1270,8 +1257,7 @@ public class TestLiteWorkflowAppParser extends XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
+        } catch (WorkflowException we) {
             assertEquals(ErrorCode.E0743, we.getErrorCode());
             // Make sure the message contains the node involved in the invalid 
transition
             assertTrue(we.getMessage().contains("four"));
@@ -1314,8 +1300,7 @@ public class TestLiteWorkflowAppParser extends XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
+        } catch (WorkflowException we) {
             assertEquals(ErrorCode.E0743, we.getErrorCode());
             // Make sure the message contains the node involved in the invalid 
transition
             assertTrue(we.getMessage().contains("four"));
@@ -1391,12 +1376,10 @@ public class TestLiteWorkflowAppParser extends 
XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
-            assertEquals(ErrorCode.E0732, we.getErrorCode());
-            assertTrue(we.getMessage().contains("Fork [f]"));
-            assertTrue(we.getMessage().contains("Join [j1]") && 
we.getMessage().contains("been [j2]")
-                    || we.getMessage().contains("Join [j2]") && 
we.getMessage().contains("been [j1]"));
+        } catch (WorkflowException we) {
+            assertEquals(ErrorCode.E0757, we.getErrorCode());
+            assertTrue(we.getMessage().contains("Fork node [f]"));
+            assertTrue(we.getMessage().contains("[j2,j1]"));
         }
     }
 
@@ -1425,29 +1408,54 @@ public class TestLiteWorkflowAppParser extends 
XTestCase {
         try {
             invokeForkJoin(parser, def);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (Exception ex) {
-            WorkflowException we = (WorkflowException) ex.getCause();
+        } catch (WorkflowException we) {
             assertEquals(ErrorCode.E0744, we.getErrorCode());
             assertTrue(we.getMessage().contains("fork, [f],"));
             assertTrue(we.getMessage().contains("node, [two]"));
         }
     }
 
-    // Invoke private validateForkJoin method using Reflection API
-    private void invokeForkJoin(LiteWorkflowAppParser parser, LiteWorkflowApp 
def) throws Exception {
-        Class<? extends LiteWorkflowAppParser> c = parser.getClass();
-        Class<?> d = 
Class.forName("org.apache.oozie.workflow.lite.LiteWorkflowAppParser$VisitStatus");
-        Field f = d.getField("VISITING");
-        Map traversed = new HashMap();
-        traversed.put(def.getNode(StartNodeDef.START).getName(), f);
-        Method validate = c.getDeclaredMethod("validate", 
LiteWorkflowApp.class, NodeDef.class, Map.class);
-        validate.setAccessible(true);
-        // invoke validate method to populate the fork and join list
-        validate.invoke(parser, def, def.getNode(StartNodeDef.START), 
traversed);
-        Method validateForkJoin = c.getDeclaredMethod("validateForkJoin", 
LiteWorkflowApp.class);
-        validateForkJoin.setAccessible(true);
-        // invoke validateForkJoin
-        validateForkJoin.invoke(parser, def);
+    @SuppressWarnings("deprecation")
+    public void testForkJoinValidationTime() throws Exception {
+        final LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+
+        final LiteWorkflowApp app = 
parser.validateAndParse(IOUtils.getResourceAsReader("wf-long.xml", -1),
+                new Configuration());
+
+        final AtomicBoolean failure = new AtomicBoolean(false);
+        final AtomicBoolean finished = new AtomicBoolean(false);
+
+        Runnable r = new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    invokeForkJoin(parser, app);
+                    finished.set(true);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    failure.set(true);
+                }
+            }
+        };
+
+        Thread t = new Thread(r);
+        t.start();
+        t.join((long) (2000 * XTestCase.WAITFOR_RATIO));
+
+        if (!finished.get()) {
+            t.stop();  // don't let the validation keep running in the 
background which causes high CPU load
+            fail("Workflow validation did not finish in time");
+        }
+
+        assertFalse("Workflow validation failed", failure.get());
+    }
+
+    private void invokeForkJoin(LiteWorkflowAppParser parser, LiteWorkflowApp 
def) throws WorkflowException {
+        LiteWorkflowValidator validator = new LiteWorkflowValidator();
+        validator.validateWorkflow(def, true);
     }
 
     // If Xerces 2.10.0 is not explicitly listed as a dependency in the poms, 
then Java will revert to an older version that has

Reply via email to