Author: mriou
Date: Tue Nov 21 10:10:29 2006
New Revision: 477796
URL: http://svn.apache.org/viewvc?view=rev&rev=477796
Log:
Making the VPU reentrant. Necessary for in-thread process to process
interactions.
Modified:
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
Modified:
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?view=diff&rev=477796&r1=477795&r2=477796
==============================================================================
---
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
(original)
+++
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
Tue Nov 21 10:10:29 2006
@@ -18,29 +18,20 @@
*/
package org.apache.ode.jacob.vpu;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.ode.jacob.Channel;
-import org.apache.ode.jacob.ChannelListener;
-import org.apache.ode.jacob.JacobObject;
-import org.apache.ode.jacob.JacobRunnable;
-import org.apache.ode.jacob.JacobThread;
-import org.apache.ode.jacob.SynchChannel;
-import org.apache.ode.jacob.soup.CommChannel;
-import org.apache.ode.jacob.soup.CommGroup;
-import org.apache.ode.jacob.soup.CommRecv;
-import org.apache.ode.jacob.soup.CommSend;
-import org.apache.ode.jacob.soup.Continuation;
-import org.apache.ode.jacob.soup.ExecutionQueue;
+import org.apache.ode.jacob.*;
+import org.apache.ode.jacob.soup.*;
import org.apache.ode.utils.ArrayUtils;
import org.apache.ode.utils.ObjectPrinter;
import org.apache.ode.utils.msg.MessageBundle;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+
/**
* The JACOB Virtual Processing Unit ("VPU").
*
@@ -55,9 +46,9 @@
private static final JacobMessages __msgs =
MessageBundle.getMessages(JacobMessages.class);
/**
- * Thread-local for associating a thread with a VPU.
+ * Thread-local for associating a thread with a VPU. Needs to be stored in
a stack to allow reentrance.
*/
- static final ThreadLocal<JacobThread> __activeJacobThread = new
ThreadLocal<JacobThread>();
+ static final ThreadLocal<Stack<JacobThread>> __activeJacobThread = new
ThreadLocal<Stack<JacobThread>>();
private static final Method REDUCE_METHOD;
@@ -209,7 +200,7 @@
* Get the active Jacob thread, i.e. the one associated with the current
Java thread.
*/
public static JacobThread activeJacobThread() {
- return __activeJacobThread.get();
+ return __activeJacobThread.get().peek();
}
/**
@@ -232,15 +223,15 @@
Method[] methods = kind.getMethods();
boolean found = false;
- for (int i = 0; i<methods.length; ++i) {
- if (methods[i].getDeclaringClass() == Object.class) {
+ for (Method method : methods) {
+ if (method.getDeclaringClass() == Object.class) {
continue;
}
if (found) {
buf.append(" & ");
}
- buf.append(methods[i].getName()).append('(');
- Class[] argTypes = methods[i].getParameterTypes();
+ buf.append(method.getName()).append('(');
+ Class[] argTypes = method.getParameterTypes();
for (int j = 0; j < argTypes.length; ++j) {
if (j > 0) {
buf.append(", ");
@@ -443,7 +434,6 @@
assert _methodBody != null;
assert _method != null;
assert
_method.getDeclaringClass().isAssignableFrom(_methodBody.getClass());
- assert __activeJacobThread.get() == null;
if (__log.isTraceEnabled()) {
String dbgMsg = _cycle + ": " + _source;
@@ -461,7 +451,7 @@
args = _args;
synchChannel = null;
}
- __activeJacobThread.set(this);
+ stackThread();
long ctime = System.currentTimeMillis();
try {
_method.invoke(_methodBody, args);
@@ -481,15 +471,28 @@
} finally {
ctime = System.currentTimeMillis() - ctime;
_statistics.totalClientTimeMs += ctime;
- __activeJacobThread.set(null);
+ unstackThread();
_prefix = null;
}
-
- assert __activeJacobThread.get() == null;
}
public String toString() {
return "PT[ " + _methodBody + " ]";
+ }
+
+ private void stackThread() {
+ Stack<JacobThread> currStack = __activeJacobThread.get();
+ if (currStack == null) {
+ currStack = new Stack<JacobThread>();
+ __activeJacobThread.set(currStack);
+ }
+ currStack.push(this);
+ }
+
+ private JacobThread unstackThread() {
+ Stack<JacobThread> currStack = __activeJacobThread.get();
+ assert currStack != null;
+ return currStack.pop();
}
}