Author: jstrachan
Date: Fri Aug 25 00:49:16 2006
New Revision: 436701
URL: http://svn.apache.org/viewvc?rev=436701&view=rev
Log:
simplified the workflow code to make it alittle easier to follow and reduce the
possibility of timing issues together with fixing of a few timing related
issues in the join support & activity classes to ensure we don't start things
twice or set things to be started after they've stopped
Modified:
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java
incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java
Modified:
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java?rev=436701&r1=436700&r2=436701&view=diff
==============================================================================
---
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java
(original)
+++
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/AbstractActivity.java
Fri Aug 25 00:49:16 2006
@@ -45,7 +45,7 @@
public void start() {
if (state.compareAndSet(Transitions.Initialised,
Transitions.Starting)) {
doStart();
- state.set(Transitions.Started);
+ state.compareAndSet(Transitions.Starting, Transitions.Started);
}
}
Modified:
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java?rev=436701&r1=436700&r2=436701&view=diff
==============================================================================
---
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java
(original)
+++
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/JoinSupport.java
Fri Aug 25 00:49:16 2006
@@ -17,7 +17,9 @@
package org.apache.servicemix.beanflow;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
/**
* A useful base class for a activity which joins on the success of a
collection
@@ -28,6 +30,7 @@
public abstract class JoinSupport extends TimeoutActivity {
private List<Activity> children = new ArrayList<Activity>();
+ private Set<Activity> toBeStarted = new HashSet();
public JoinSupport() {
}
@@ -37,6 +40,7 @@
for (Activity activity : activities) {
activity.getState().addRunnable(this);
children.add(activity);
+ toBeStarted.add(activity);
}
}
}
@@ -46,6 +50,7 @@
for (Activity activity : activities) {
activity.getState().addRunnable(this);
children.add(activity);
+ toBeStarted.add(activity);
}
}
}
@@ -91,9 +96,10 @@
// lets make sure that the child activities are started properly
synchronized (children) {
- for (Activity child : children) {
+ for (Activity child : toBeStarted) {
child.start();
}
+ toBeStarted.clear();
}
}
Modified:
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java?rev=436701&r1=436700&r2=436701&view=diff
==============================================================================
---
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java
(original)
+++
incubator/servicemix/trunk/servicemix-beanflow/src/main/java/org/apache/servicemix/beanflow/Workflow.java
Fri Aug 25 00:49:16 2006
@@ -23,8 +23,10 @@
import org.apache.servicemix.beanflow.support.ReflectionInterpreter;
import java.util.Timer;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -38,10 +40,9 @@
private Executor executor;
private Interpreter interpreter;
- private State<T> step;
- private T nextStep;
private Timer timer = new Timer();
private AtomicBoolean suspended = new AtomicBoolean();
+ private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
/**
* TODO is there a way to reference the parameter type of this class?
@@ -57,65 +58,53 @@
public Workflow(T firstStep) {
this(Executors.newSingleThreadExecutor(), firstStep);
}
-
+
public Workflow(Executor executor, T firstStep) {
- this(executor, new ReflectionInterpreter(), new
DefaultState<T>(firstStep));
+ this(executor, new ReflectionInterpreter(), firstStep);
}
-
- public Workflow(Executor executor, Interpreter interpreter, State<T> step)
{
+
+ public Workflow(Executor executor, Interpreter interpreter, T firstStep) {
this.executor = executor;
this.interpreter = interpreter;
- this.step = step;
-
- T firstStep = step.get();
+
if (firstStep instanceof Enum) {
validateStepsExist(firstStep.getClass());
}
+ setNextStep(firstStep);
}
/**
* Returns the next step which will be executed asynchronously
*/
public T getNextStep() {
- return nextStep;
+ return queue.peek();
}
/**
* Sets the next step to be executed when the current step completes
*/
public void setNextStep(T stepName) {
- this.nextStep = stepName;
- suspended.set(false);
- nextStep();
+ queue.add(stepName);
+ executor.execute(this);
}
public void run() {
- if (!isSuspended() && !isStopped()) {
- T stepToExecute = step.get();
- if (stepToExecute != null) {
- if (log.isDebugEnabled()) {
- log.debug("About to execute step: " + stepToExecute);
+ while (!isStopped()) {
+ try {
+ T stepToExecute = queue.poll();
+ if (stepToExecute != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("About to execute step: " + stepToExecute);
+ }
+ interpreter.executeStep(stepToExecute, this);
+ }
+ else {
+ break;
}
-
- interpreter.executeStep(stepToExecute, this);
- nextStep();
}
- }
- }
-
- public void nextStep() {
- if (nextStep != null) {
- T stepToExecute = nextStep;
- nextStep = null;
- // lets fire any conditions
- // This very function is a listener of step, so setting the step
- // will trigger ourself. We just need to return now.
- step.set(stepToExecute);
- }
-
- // if we are not stopped lets add a task to re-evaluate ourself
- if (!isStopped() && !isSuspended()) {
- executor.execute(this);
+ catch (RuntimeException e) {
+ log.warn("Caught: " + e, e);
+ }
}
}
@@ -180,7 +169,7 @@
* Returns true if this workflow has a next step to execute
*/
public boolean isNextStepAvailable() {
- return nextStep != null;
+ return !queue.isEmpty();
}
/**
@@ -188,10 +177,8 @@
*/
public Runnable createGoToStepTask(final T joinedStep) {
return new Runnable() {
-
public void run() {
setNextStep(joinedStep);
- //nextStep();
}
};
}
Modified:
incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java?rev=436701&r1=436700&r2=436701&view=diff
==============================================================================
---
incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java
(original)
+++
incubator/servicemix/trunk/servicemix-beanflow/src/test/java/org/apache/servicemix/beanflow/ParallelActivityTest.java
Fri Aug 25 00:49:16 2006
@@ -36,7 +36,7 @@
// START SNIPPET: example
ExampleParallelBean parallelBean = new ExampleParallelBean();
ParallelActivity activity =
ParallelActivity.newParallelMethodActivity(executor, parallelBean);
- activity.startWithTimeout(timer, 20000);
+ activity.startWithTimeout(timer, 2000);
// END SNIPPET: example
activity.join(10, TimeUnit.SECONDS);