Author: mriou
Date: Tue Nov 21 10:10:29 2006
New Revision: 477796

URL: http://svn.apache.org/viewvc?view=rev&rev=477796
Log:
Making the VPU reentrant. Necessary for in-thread process to process 
interactions.

Modified:
    
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java

Modified: 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?view=diff&rev=477796&r1=477795&r2=477796
==============================================================================
--- 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java 
(original)
+++ 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java 
Tue Nov 21 10:10:29 2006
@@ -18,29 +18,20 @@
  */
 package org.apache.ode.jacob.vpu;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.ode.jacob.Channel;
-import org.apache.ode.jacob.ChannelListener;
-import org.apache.ode.jacob.JacobObject;
-import org.apache.ode.jacob.JacobRunnable;
-import org.apache.ode.jacob.JacobThread;
-import org.apache.ode.jacob.SynchChannel;
-import org.apache.ode.jacob.soup.CommChannel;
-import org.apache.ode.jacob.soup.CommGroup;
-import org.apache.ode.jacob.soup.CommRecv;
-import org.apache.ode.jacob.soup.CommSend;
-import org.apache.ode.jacob.soup.Continuation;
-import org.apache.ode.jacob.soup.ExecutionQueue;
+import org.apache.ode.jacob.*;
+import org.apache.ode.jacob.soup.*;
 import org.apache.ode.utils.ArrayUtils;
 import org.apache.ode.utils.ObjectPrinter;
 import org.apache.ode.utils.msg.MessageBundle;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+
 /**
  * The JACOB Virtual Processing Unit ("VPU").
  * 
@@ -55,9 +46,9 @@
     private static final JacobMessages __msgs = 
MessageBundle.getMessages(JacobMessages.class);
 
     /** 
-     * Thread-local for associating a thread with a VPU. 
+     * Thread-local for associating a thread with a VPU. Needs to be stored in 
a stack to allow reentrance.
      */
-    static final ThreadLocal<JacobThread> __activeJacobThread = new 
ThreadLocal<JacobThread>();
+    static final ThreadLocal<Stack<JacobThread>> __activeJacobThread = new 
ThreadLocal<Stack<JacobThread>>();
 
     private static final Method REDUCE_METHOD;
 
@@ -209,7 +200,7 @@
      * Get the active Jacob thread, i.e. the one associated with the current 
Java thread.
      */
     public static JacobThread activeJacobThread() {
-        return __activeJacobThread.get();
+        return __activeJacobThread.get().peek();
     }
 
     /**
@@ -232,15 +223,15 @@
         Method[] methods = kind.getMethods();
         boolean found = false;
 
-        for (int i = 0; i<methods.length; ++i) {
-            if (methods[i].getDeclaringClass() == Object.class) {
+        for (Method method : methods) {
+            if (method.getDeclaringClass() == Object.class) {
                 continue;
             }
             if (found) {
                 buf.append(" & ");
             }
-            buf.append(methods[i].getName()).append('(');
-            Class[] argTypes = methods[i].getParameterTypes();
+            buf.append(method.getName()).append('(');
+            Class[] argTypes = method.getParameterTypes();
             for (int j = 0; j < argTypes.length; ++j) {
                 if (j > 0) {
                     buf.append(", ");
@@ -443,7 +434,6 @@
             assert _methodBody != null;
             assert _method != null;
             assert 
_method.getDeclaringClass().isAssignableFrom(_methodBody.getClass());
-            assert __activeJacobThread.get() == null;
 
             if (__log.isTraceEnabled()) {
                 String dbgMsg = _cycle + ": " + _source;
@@ -461,7 +451,7 @@
                 args = _args;
                 synchChannel = null;
             }
-            __activeJacobThread.set(this);
+            stackThread();
             long ctime = System.currentTimeMillis();
             try {
                 _method.invoke(_methodBody, args);
@@ -481,15 +471,28 @@
             } finally {
                 ctime = System.currentTimeMillis() - ctime;
                 _statistics.totalClientTimeMs += ctime;
-                __activeJacobThread.set(null);
+                unstackThread();
                 _prefix = null;
             }
-
-            assert __activeJacobThread.get() == null;
         }
 
         public String toString() {
             return "PT[ " + _methodBody + " ]";
+        }
+
+        private void stackThread() {
+            Stack<JacobThread> currStack = __activeJacobThread.get();
+            if (currStack == null) {
+                currStack = new Stack<JacobThread>();
+                __activeJacobThread.set(currStack);
+            }
+            currStack.push(this);
+        }
+
+        private JacobThread unstackThread() {
+            Stack<JacobThread> currStack = __activeJacobThread.get();
+            assert currStack != null;
+            return currStack.pop();
         }
     }
 


Reply via email to