Modified: 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
URL: 
http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?view=diff&rev=469170&r1=469169&r2=469170
==============================================================================
--- 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java 
(original)
+++ 
incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java 
Mon Oct 30 07:57:02 2006
@@ -18,6 +18,11 @@
  */
 package org.apache.ode.jacob.vpu;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.jacob.Channel;
@@ -36,506 +41,456 @@
 import org.apache.ode.utils.ObjectPrinter;
 import org.apache.ode.utils.msg.MessageBundle;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
-
 /**
  * The JACOB Virtual Processing Unit ("VPU").
- *
+ * 
  * @author Maciej Szefler <a href="mailto:[EMAIL PROTECTED]" />
  */
 public final class JacobVPU {
-  /** Class-level logger. */
-  private static final Log __log = LogFactory.getLog(JacobVPU.class);
+    private static final Log __log = LogFactory.getLog(JacobVPU.class);
+
+    /** 
+     * Internationalization messages.
+     */
+    private static final JacobMessages __msgs = 
MessageBundle.getMessages(JacobMessages.class);
+
+    /** 
+     * Thread-local for associating a thread with a VPU. 
+     */
+    static final ThreadLocal<JacobThread> __activeJacobThread = new 
ThreadLocal<JacobThread>();
+
+    private static final Method REDUCE_METHOD;
+
+    /** 
+     * Resolve the [EMAIL PROTECTED] JacobRunnable#run} method statically
+     */
+    static {
+        try {
+            REDUCE_METHOD = JacobRunnable.class.getMethod("run", 
ArrayUtils.EMPTY_CLASS_ARRAY);
+        } catch (Exception e) {
+            throw new Error("Cannot resolve 'run' method", e);
+        }
+    }
+
+    /** 
+     * Persisted cross-VPU state (state of the channels)
+     */
+    private ExecutionQueue _executionQueue;
+
+    private Map<Class, Object> _extensions = new HashMap<Class, Object>();
+
+    /** 
+     * Classloader used for loading object continuations.
+     */
+    private ClassLoader _classLoader = getClass().getClassLoader();
+
+    private int _cycle;
+
+    private Statistics _statistics = new Statistics();
+
+    /** 
+     * The fault "register" of the VPU . 
+     */
+    private RuntimeException _fault;
+
+    /**
+     * Default constructor.
+     */
+    public JacobVPU() {
+    }
+
+    /**
+     * Re-hydration constructor.
+     * 
+     * @param executionQueue previously saved execution context
+     */
+    public JacobVPU(ExecutionQueue executionQueue) {
+        setContext(executionQueue);
+    }
+
+    /**
+     * Instantiation constructor; used to initialize context with the 
concretion
+     * of a process abstraction.
+     * 
+     * @param context virgin context object
+     * @param concretion the process
+     */
+    public JacobVPU(ExecutionQueue context, JacobRunnable concretion) {
+        setContext(context);
+        inject(concretion);
+    }
+
+    /**
+     * Execute one VPU cycle.
+     * 
+     * @return <code>true</code> if the run queue is not empty after this 
cycle, <code>false</code> otherwise.
+     */
+    public boolean execute() {
+        if (__log.isTraceEnabled()) {
+            __log.trace(ObjectPrinter.stringifyMethodEnter("execute", 
ArrayUtils.EMPTY_OBJECT_ARRAY));
+        }
+        if (_executionQueue == null) {
+            throw new IllegalStateException("No state object for VPU!");
+        }
+        if (_fault != null) {
+            throw _fault;
+        }
+        if (!_executionQueue.hasReactions()) {
+            return false;
+        }
+        _cycle = _executionQueue.cycle();
+
+        Continuation rqe = _executionQueue.dequeueReaction();
+        JacobThreadImpl jt = new JacobThreadImpl(rqe);
+
+        long ctime = System.currentTimeMillis();
+        try {
+            jt.run();
+        } catch (RuntimeException re) {
+            _fault = re;
+            throw re;
+        }
+
+        long rtime = System.currentTimeMillis() - ctime;
+        ++_statistics.numCycles;
+        _statistics.totalRunTimeMs += rtime;
+        _statistics.incRunTime(jt._targetStr, rtime);
+        return true;
+    }
+
+    public void flush() {
+        if (__log.isTraceEnabled()) {
+            __log.trace(ObjectPrinter.stringifyMethodEnter("flush", 
ArrayUtils.EMPTY_OBJECT_ARRAY));
+        }
+        _executionQueue.flush();
+    }
+
+    /**
+     * Set the state of of the VPU; this is analagous to loading a CPU with a
+     * thread's context (re-hydration).
+     * 
+     * @param executionQueue
+     *            process executionQueue (state)
+     */
+    public void setContext(ExecutionQueue executionQueue) {
+        if (__log.isTraceEnabled()) {
+            __log.trace(ObjectPrinter.stringifyMethodEnter("setContext",
+                    new Object[] { "executionQueue", executionQueue }));
+        }
+        _executionQueue = executionQueue;
+        _executionQueue.setClassLoader(_classLoader);
+    }
 
-  /** Internationalization messages. */
-  private static final JacobMessages __msgs = 
MessageBundle.getMessages(JacobMessages.class);
+    public void registerExtension(Class extensionClass, Object obj) {
+        if (__log.isTraceEnabled()) {
+            __log.trace(ObjectPrinter
+                    .stringifyMethodEnter("registerExtension", new Object[] {
+                            "extensionClass", extensionClass, "obj", obj }));
+        }
+        _extensions.put(extensionClass, obj);
+    }
+
+    /**
+     * Add an item to the run queue.
+     */
+    public void addReaction(JacobObject jo, Method method, Object[] args, 
String desc) {
+        if (__log.isTraceEnabled()) {
+            __log.trace(ObjectPrinter.stringifyMethodEnter("addReaction",
+                    new Object[] { "jo", jo, "method", method, "args", args, 
"desc", desc }));
+        }
 
-  /** Thread-local for associating a thread with a VPU. */
-  static final ThreadLocal<JacobThread> __activeJacobThread = new 
ThreadLocal<JacobThread>();
+        Continuation continuation = new Continuation(jo, method, args);
+        continuation.setDescription(desc);
+        _executionQueue.enqueueReaction(continuation);
+        ++_statistics.runQueueEntries;
+    }
 
-  private static final Method REDUCE_METHOD;
-
-  /** Pre-fetch the [EMAIL PROTECTED] JacobRunnable#run} method */
-  static {
-    Method rm = null;
-
-    try {
-      rm = JacobRunnable.class.getMethod("run", ArrayUtils.EMPTY_CLASS_ARRAY);
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-
-    REDUCE_METHOD = rm;
-  }
-
-  /** Persisted cross-VPU state (state of the channels) */
-  private ExecutionQueue _executionQueue;
-
-  private Map<Class, Object> _extensions = new HashMap<Class, Object>();
-
-  /** Classloader used for loading object continuations. */
-  private ClassLoader _classLoader = getClass().getClassLoader();
-
-  private int _cycle;
-
-  private Statistics _statistics = new Statistics();
-
-  /** The fault "register" of the VPU . */
-  private RuntimeException _fault;
-
-  /**
-   * Default constructor.
-   */
-  public JacobVPU() {
-  }
-
-  /**
-   * Re-hydration constructor.
-   * @param executionQueue previously saved execution context
-   */
-  public JacobVPU(ExecutionQueue executionQueue) {
-    this();
-    setContext(executionQueue);
-  }
-
-  /**
-   * Instantiation constructor; used to initialize context with
-   * the concretion of a process abstraction.
-   * @param context virgin context object
-   * @param concretion the process
-   */
-  public JacobVPU(ExecutionQueue context, JacobRunnable concretion) {
-    setContext(context);
-    inject(concretion);
-  }
-
-  /**
-   * Execute one VPU cycle.
-   * @return <code>true</code> if the run queue is not empty after this cycle,
-   *         <code>false</code> otherwise.
-   */
-  public boolean execute() {
-    if (__log.isTraceEnabled())
-      __log.trace(ObjectPrinter.stringifyMethodEnter("execute", 
ArrayUtils.EMPTY_OBJECT_ARRAY));
-
-    if (_executionQueue == null)
-      throw new IllegalStateException("No state object for VPU!");
-    
-    if (_fault != null) {
-      throw _fault;
-    }
-
-    if (!_executionQueue.hasReactions()) {
-      return false;
-    }
-
-    _cycle = _executionQueue.cycle();
-
-    Continuation rqe = _executionQueue.dequeueReaction();
-    JacobThreadImpl jt = new JacobThreadImpl(rqe);
-
-    long ctime = System.currentTimeMillis();
-    try {
-      jt.run();
-    } catch (RuntimeException re) {
-      _fault = re;
-      throw re;
-    }
-
-    long rtime = System.currentTimeMillis() - ctime;
-    ++_statistics.numCycles;
-    _statistics.totalRunTimeMs += rtime;
-    _statistics.incRunTime(jt._targetStr, rtime);
-    return true;
-  }
-
-  public void flush() {
-    if (__log.isTraceEnabled())
-      __log.trace(ObjectPrinter.stringifyMethodEnter("flush", 
ArrayUtils.EMPTY_OBJECT_ARRAY));
-    _executionQueue.flush();
-  }
-
-  /**
-   * DOCUMENTME
-   */
-  public void reset() {
-    if (__log.isTraceEnabled())
-      __log.trace(ObjectPrinter.stringifyMethodEnter("reset", 
ArrayUtils.EMPTY_OBJECT_ARRAY));
-  }
-
-  /**
-   * Set the state of of the VPU; this is analagous to loading a CPU with
-   * a thread's context (re-hydration).
-   * @param executionQueue process executionQueue (state)
-   */
-  public void setContext(ExecutionQueue executionQueue) {
-    if (__log.isTraceEnabled())
-      __log.trace(ObjectPrinter.stringifyMethodEnter("setContext", new 
Object[] {"soupDao", executionQueue} ));
-    _executionQueue = executionQueue;
-    _executionQueue.setClassLoader(_classLoader);
-  }
-
-
-  public void registerExtension(Class extensionClass, Object obj) {
-    if (__log.isTraceEnabled())
-      __log.trace(ObjectPrinter.stringifyMethodEnter("registerExtension", new 
Object[] {
-        "extensionClass", extensionClass,
-        "obj", obj
-      } ));
-
-    _extensions.put(extensionClass, obj);
-  }
-
-  /**
-   * Add an item to the run queue.
-   */
-  public void addReaction(JacobObject jmb, Method method, Object[] args, 
String desc) {
-    if (__log.isTraceEnabled())
-      __log.trace(ObjectPrinter.stringifyMethodEnter("addReaction", new 
Object[] {
-        "jmb", jmb,
-        "method", method,
-        "args", args,
-        "desc", desc
-      }));
-
-    Continuation continuation = new Continuation(jmb, method, args);
-    continuation.setDescription(desc);
-    _executionQueue.enqueueReaction(continuation);
-    ++_statistics.runQueueEntries;
-  }
-
-  /**
-   * Get the active Jacob thread, i.e. the one associated with the current
-   * Java thread.
-   *
-   * @return Jacob thread ([EMAIL PROTECTED] JacobThread}) associated with the 
current Java thread
-   * @see JacobThread
-   */
-  public static JacobThread activeJacobThread() {
-    return __activeJacobThread.get();
-  }
-
-
-
-
-  /**
-   * Inject a concretion into the process context. This amounts to
-   * chaning the process context from <code>P</code> to <code>P|Q</code>
-   * where <code>P</code> is the previous process context and
-   * <code>Q</code> is the injected process. This method is equivalent
-   * to the parallel operator, but is intended to be used from outside
-   * of an active [EMAIL PROTECTED] JacobThread}.
-   * @param concretion the concretion to inject into the process context
-   */
-  public void inject(JacobRunnable concretion) {
-    if (__log.isTraceEnabled())
-      __log.trace(ObjectPrinter.stringifyMethodEnter("inject", new Object[] { 
"concretion", concretion }));
-
-
-    if (__log.isDebugEnabled())
-      __log.debug("injecting " + concretion);
-    addReaction(concretion, REDUCE_METHOD, ArrayUtils.EMPTY_OBJECT_ARRAY,
-            (__log.isInfoEnabled() ? concretion.toString() : null));
-  }
-
-  static String stringifyMethods(Class kind) {
-    StringBuffer buf = new StringBuffer();
-    Method[] methods = kind.getMethods();
-    boolean found = false;
-
-    for (int i = 0; i < methods.length; ++i) {
-      if (methods[i].getDeclaringClass() == Object.class) {
-        continue;
-      }
-
-      if (found) {
-        buf.append(" & ");
-      }
-
-      buf.append(methods[i].getName());
-      buf.append('(');
-
-      Class[] argTypes = methods[i].getParameterTypes();
-
-      for (int j = 0; j < argTypes.length; ++j) {
-        if (j > 0) {
-          buf.append(", ");
-        }
-
-        buf.append(argTypes[j].getName());
-      }
-
-      buf.append(") {...}");
-      found = true;
-    }
-
-    return buf.toString();
-  }
-
-
-  static String stringify(Object[] list) {
-    if (list == null) {
-      return "";
-    }
-
-    StringBuffer buf = new StringBuffer();
-
-    for (int i = 0; i < list.length; ++i) {
-      if (i > 0) {
-        buf.append(',');
-      }
-
-      buf.append(list[i]);
-    }
-
-    return buf.toString();
-  }
-
-  public void setClassLoader(ClassLoader classLoader) {
-    _classLoader = classLoader;
-    if (_executionQueue != null)
-      _executionQueue.setClassLoader(classLoader);
-  }
-
-
-  /**
-   * Dump the state of the VPU for debugging purposes.
-   */
-  public void dumpState() {
-    _statistics.printToStream(System.err);
-    _executionQueue.dumpState(System.err);
-  }
-
-  public boolean isComplete() {
-    return _executionQueue.isComplete();
-  }
-
-
-  private class JacobThreadImpl implements Runnable, JacobThread {
-    private final JacobObject _methodBody;
-    private final Object[] _args;
-    private final Method _method;
-    private String _prefix;
-
-    /** Text string identifying the left side of the reduction (for debug). */
-    private String _source;
-
-    /** Text string identifying the target class and method (for debug) .*/
-    private String _targetStr = "Unknown";
-
-    JacobThreadImpl(Continuation rqe) {
-      assert rqe != null;
-
-      _methodBody = rqe.getClosure();
-      _args = rqe.getArgs();
-      _source = rqe.getDescription();
-      _method = rqe.getMethod();
-      
-      if(__log.isDebugEnabled()){
-       StringBuffer buf = new StringBuffer(_methodBody.getClass().getName());
-       buf.append('.');
-        buf.append(rqe.getMethod());
-        _targetStr = buf.toString();
-      }
-     
-    }
-
-    public void instance(JacobRunnable template) {
-      String desc = null;
-      if (__log.isDebugEnabled()){
-        __log.debug(_cycle + ": " + _prefix +  template);
-        desc = template.toString();
-      }
-
-      _statistics.numReductionsStruct++;
-      addReaction(template, REDUCE_METHOD, ArrayUtils.EMPTY_OBJECT_ARRAY, 
desc);
-    }
-
-    public Channel message(Channel channel, Method method, Object[] args) {
-      if (__log.isDebugEnabled())
-        __log.debug(_cycle + ": " + _prefix  + channel + " ! "
-                + method.getName() + "(" + stringify(args) + ")");
-
-      _statistics.messagesSent++;
-
-      SynchChannel replyChannel = null;
-
-      // Check for synchronous methods; create a synchronization channel
-      if (method.getReturnType() != void.class) {
-        if (method.getReturnType() != SynchChannel.class)
-          throw new IllegalStateException("ChannelListener method can only 
return SynchChannel: " + method);
-        replyChannel = (SynchChannel) newChannel(SynchChannel.class,"", "Reply 
Channel");
-        Object[] newArgs = new Object[args.length + 1];
-        System.arraycopy(args, 0, newArgs, 0, args.length);
-        newArgs[args.length] = replyChannel;
-        args = newArgs;
-      }
-      CommChannel chnl = (CommChannel) ChannelFactory.getBackend(channel);
-      CommGroup grp = new CommGroup(false);
-      CommSend send = new CommSend(chnl, method, args);
-
-      grp.add(send);
-      _executionQueue.add(grp);
-
-      return replyChannel;
-    }
-
-    public Channel newChannel(Class channelType, String creator,
-                              String description) {
-      CommChannel chnl = new CommChannel(channelType);
-      chnl.setDescription(description);
-      _executionQueue.add(chnl);
-
-      // Some of the debug information is a bit lengthy...
-      //cframe.setDebugInfo(fillDebugInfo());
-
-      Channel ret = ChannelFactory.createChannel(chnl, channelType);
-      if (__log.isDebugEnabled())
-        __log.debug(_cycle + ": " + _prefix + "new " + ret );
-
-      _statistics.channelsCreated++;
-      return ret;
-
-    }
-
-    public String exportChannel(Channel channel) {
-      if (__log.isDebugEnabled())
-        __log.debug(_cycle + ": " + _prefix + "export<" + channel + ">");
-
-      CommChannel chnl = (CommChannel)ChannelFactory.getBackend(channel);
-      return _executionQueue.createExport(chnl);
-    }
-
-    public Channel importChannel(String channelId, Class channelType) {
-      try {
-        CommChannel cframe = _executionQueue.consumeExport(channelId);
-        return ChannelFactory.createChannel(cframe, channelType);
-      } catch (RuntimeException re) {
-        throw re;
-      }
+    /**
+     * Get the active Jacob thread, i.e. the one associated with the current 
Java thread.
+     */
+    public static JacobThread activeJacobThread() {
+        return __activeJacobThread.get();
     }
 
     /**
-     * @see JacobThread#object
+     * Inject a concretion into the process context. This amounts to chaning 
the
+     * process context from <code>P</code> to <code>P|Q</code> where
+     * <code>P</code> is the previous process context and <code>Q</code> is
+     * the injected process. This method is equivalent to the parallel 
operator,
+     * but is intended to be used from outside of an active [EMAIL PROTECTED] 
JacobThread}.
      */
-    public void object(boolean replicate,  ChannelListener[] ml) {
-      if (__log.isDebugEnabled()) {
-        StringBuffer msg = new StringBuffer();
-        msg.append(_cycle);
-        msg.append(": ");
-        msg.append(_prefix);
-        for (int i = 0 ; i < ml.length; ++i) {
-          if (i != 0)
-            msg.append(" + ");
-          msg.append(ml[i].getChannel());
-          msg.append(" ? ");
-          msg.append(ml.toString());
-
-        }
-        __log.debug(msg.toString());
-      }
-
-      _statistics.numContinuations++;
-
-      CommGroup grp = new CommGroup(replicate);
-      for (int i = 0; i < ml.length; ++i) {
-        CommChannel chnl = (CommChannel) 
ChannelFactory.getBackend(ml[i].getChannel());
-        // TODO see below..
-        // oframe.setDebugInfo(fillDebugInfo());
-        CommRecv recv = new CommRecv(chnl,ml[i]);
-        grp.add(recv);
-      }
-
-      _executionQueue.add(grp);
-
-    }
-
-    public void object(boolean replicate, ChannelListener methodList) throws 
IllegalArgumentException {
-      object(replicate, new ChannelListener[] { methodList } );
-    }
-
-//    private DebugInfo fillDebugInfo() {
-//      // Some of the debug information is a bit lengthy, so lets not put it 
in
-//      // all the time... eh.
-//      if (_debug) {
-//        DebugInfo frame  = new DebugInfo();
-//        frame.setCreator(_source);
-//        Exception ex = new Exception();
-//        StackTraceElement[] st = ex.getStackTrace();
-//        if (st.length > 2) {
-//          StackTraceElement[] stcut = new StackTraceElement[st.length - 2];
-//          System.arraycopy(st, 2, stcut, 0, stcut.length);
-//          frame.setLocation(stcut);
-//        }
-//
-//        return frame;
-//      }
-//      return null;
-//    }
-
-    public Object getExtension(Class extensionClass) {
-      return _extensions.get(extensionClass);
-    }
-
-    public void run() {
-      assert _methodBody != null;
-
-      assert _method != null;
-      assert _method.getDeclaringClass()
-                    .isAssignableFrom(_methodBody.getClass());
-      assert __activeJacobThread.get() == null;
-
-
-      if (__log.isDebugEnabled()) {
-        String dbgMsg = _cycle + ": " + _source;
-        __log.debug(dbgMsg);
-        _prefix = "     ===> ";
-      }
-
-      Object[] args;
-      SynchChannel synchChannel;
-      if (_method.getReturnType() != void.class) {
-        args = new Object[_args.length-1];
-        System.arraycopy(_args,0, args, 0, args.length);
-        synchChannel = (SynchChannel)_args[args.length];
-      } else {
-        args = _args;
-        synchChannel = null;
-      }
-      __activeJacobThread.set(this);
-      long ctime = System.currentTimeMillis();
-      try {
-        _method.invoke(_methodBody, args);
-        if (synchChannel != null)
-          synchChannel.ret();
-      } catch (IllegalAccessException iae) {
-        String msg = __msgs.msgMethodNotAccessible(_method.getName(), 
_method.getDeclaringClass().getName());
-        __log.error(msg, iae);
-        throw new RuntimeException(msg, iae);
-      } catch (InvocationTargetException e) {
-        String msg = __msgs.msgClientMethodException(_method.getName(), 
_methodBody.getClass().getName());
-        __log.error(msg, e.getTargetException());
-        throw new RuntimeException(e.getTargetException());
-      } finally {
-        ctime = System.currentTimeMillis() - ctime;
-        _statistics.totalClientTimeMs += ctime;
-        __activeJacobThread.set(null);
-        _prefix = null;
-      }
-
-      assert __activeJacobThread.get() == null;
-    }
-
-    public String toString() {
-      StringBuffer buf = new StringBuffer("PT[ ");
-      buf.append(_methodBody);
-      buf.append(" ]");
-      return buf.toString();
+    public void inject(JacobRunnable concretion) {
+        if (__log.isDebugEnabled()) {
+            __log.debug("injecting " + concretion);
+        }
+        addReaction(concretion, REDUCE_METHOD, ArrayUtils.EMPTY_OBJECT_ARRAY,
+                (__log.isInfoEnabled() ? concretion.toString() : null));
+    }
+
+    static String stringifyMethods(Class kind) {
+        StringBuffer buf = new StringBuffer();
+        Method[] methods = kind.getMethods();
+        boolean found = false;
+
+        for (int i = 0; i<methods.length; ++i) {
+            if (methods[i].getDeclaringClass() == Object.class) {
+                continue;
+            }
+            if (found) {
+                buf.append(" & ");
+            }
+            buf.append(methods[i].getName()).append('(');
+            Class[] argTypes = methods[i].getParameterTypes();
+            for (int j = 0; j < argTypes.length; ++j) {
+                if (j > 0) {
+                    buf.append(", ");
+                }
+                buf.append(argTypes[j].getName());
+            }
+            buf.append(") {...}");
+            found = true;
+        }
+        return buf.toString();
+    }
+
+    static String stringify(Object[] list) {
+        if (list == null) {
+            return "";
+        }
+        StringBuffer buf = new StringBuffer();
+        for (int i = 0; i < list.length; ++i) {
+            if (i > 0) {
+                buf.append(',');
+            }
+            buf.append(list[i]);
+        }
+        return buf.toString();
+    }
+
+    public void setClassLoader(ClassLoader classLoader) {
+        _classLoader = classLoader;
+        if (_executionQueue != null) {
+            _executionQueue.setClassLoader(classLoader);
+        }
+    }
+
+    /**
+     * Dump the state of the VPU for debugging purposes.
+     */
+    public void dumpState() {
+        _statistics.printToStream(System.err);
+        _executionQueue.dumpState(System.err);
+    }
+
+    public boolean isComplete() {
+        return _executionQueue.isComplete();
+    }
+
+    private class JacobThreadImpl implements Runnable, JacobThread {
+        private final JacobObject _methodBody;
+
+        private final Object[] _args;
+
+        private final Method _method;
+
+        private String _prefix;
+
+        /** Text string identifying the left side of the reduction (for 
debug). */
+        private String _source;
+
+        /** Text string identifying the target class and method (for debug) . 
*/
+        private String _targetStr = "Unknown";
+
+        JacobThreadImpl(Continuation rqe) {
+            assert rqe != null;
+
+            _methodBody = rqe.getClosure();
+            _args = rqe.getArgs();
+            _source = rqe.getDescription();
+            _method = rqe.getMethod();
+
+            if (__log.isDebugEnabled()) {
+                StringBuffer buf = new 
StringBuffer(_methodBody.getClass().getName());
+                buf.append('.');
+                buf.append(rqe.getMethod());
+                _targetStr = buf.toString();
+            }
+        }
+
+        public void instance(JacobRunnable template) {
+            String desc = null;
+            if (__log.isDebugEnabled()) {
+                __log.debug(_cycle + ": " + _prefix + template);
+                desc = template.toString();
+            }
+            _statistics.numReductionsStruct++;
+            addReaction(template, REDUCE_METHOD, 
ArrayUtils.EMPTY_OBJECT_ARRAY, desc);
+        }
+
+        public Channel message(Channel channel, Method method, Object[] args) {
+            if (__log.isDebugEnabled()) {
+                __log.debug(_cycle + ": " + _prefix + channel + " ! "
+                        + method.getName() + "(" + stringify(args) + ")");
+            }
+            _statistics.messagesSent++;
+
+            SynchChannel replyChannel = null;
+            // Check for synchronous methods; create a synchronization channel
+            if (method.getReturnType() != void.class) {
+                if (method.getReturnType() != SynchChannel.class) {
+                    throw new IllegalStateException(
+                            "ChannelListener method can only return 
SynchChannel: " + method);
+                }
+                replyChannel = (SynchChannel) newChannel(SynchChannel.class, 
"", "Reply Channel");
+                Object[] newArgs = new Object[args.length + 1];
+                System.arraycopy(args, 0, newArgs, 0, args.length);
+                newArgs[args.length] = replyChannel;
+                args = newArgs;
+            }
+            CommChannel chnl = (CommChannel) 
ChannelFactory.getBackend(channel);
+            CommGroup grp = new CommGroup(false);
+            CommSend send = new CommSend(chnl, method, args);
+            grp.add(send);
+            _executionQueue.add(grp);
+            return replyChannel;
+        }
+
+        public Channel newChannel(Class channelType, String creator, String 
description) {
+            CommChannel chnl = new CommChannel(channelType);
+            chnl.setDescription(description);
+            _executionQueue.add(chnl);
+
+            Channel ret = ChannelFactory.createChannel(chnl, channelType);
+            if (__log.isDebugEnabled())
+                __log.debug(_cycle + ": " + _prefix + "new " + ret);
+
+            _statistics.channelsCreated++;
+            return ret;
+        }
+
+        public String exportChannel(Channel channel) {
+            if (__log.isDebugEnabled()) {
+                __log.debug(_cycle + ": " + _prefix + "export<" + channel + 
">");
+            }
+            CommChannel chnl = (CommChannel) 
ChannelFactory.getBackend(channel);
+            return _executionQueue.createExport(chnl);
+        }
+
+        public Channel importChannel(String channelId, Class channelType) {
+            CommChannel cframe = _executionQueue.consumeExport(channelId);
+            return ChannelFactory.createChannel(cframe, channelType);
+        }
+
+        public void object(boolean replicate, ChannelListener[] ml) {
+            if (__log.isDebugEnabled()) {
+                StringBuffer msg = new StringBuffer();
+                msg.append(_cycle);
+                msg.append(": ");
+                msg.append(_prefix);
+                for (int i = 0; i < ml.length; ++i) {
+                    if (i != 0)
+                        msg.append(" + ");
+                    msg.append(ml[i].getChannel());
+                    msg.append(" ? ");
+                    msg.append(ml.toString());
+
+                }
+                __log.debug(msg.toString());
+            }
+
+            _statistics.numContinuations++;
+
+            CommGroup grp = new CommGroup(replicate);
+            for (int i = 0; i < ml.length; ++i) {
+                CommChannel chnl = (CommChannel) ChannelFactory
+                        .getBackend(ml[i].getChannel());
+                // TODO see below..
+                // oframe.setDebugInfo(fillDebugInfo());
+                CommRecv recv = new CommRecv(chnl, ml[i]);
+                grp.add(recv);
+            }
+            _executionQueue.add(grp);
+        }
+
+        public void object(boolean replicate, ChannelListener methodList)
+                throws IllegalArgumentException {
+            object(replicate, new ChannelListener[] { methodList });
+        }
+
+        /* UNUSED
+         private DebugInfo fillDebugInfo() {
+            // Some of the debug information is a bit lengthy, so lets not put
+            // it in all the time... eh.
+            DebugInfo frame = new DebugInfo();
+            frame.setCreator(_source);
+            Exception ex = new Exception();
+            StackTraceElement[] st = ex.getStackTrace();
+            if (st.length > 2) {
+                StackTraceElement[] stcut = new StackTraceElement[st.length - 
2];
+                System.arraycopy(st, 2, stcut, 0, stcut.length);
+                frame.setLocation(stcut);
+            }
+
+            return frame;
+        }
+        */
+        
+        public Object getExtension(Class extensionClass) {
+            return _extensions.get(extensionClass);
+        }
+
+        public void run() {
+            assert _methodBody != null;
+            assert _method != null;
+            assert 
_method.getDeclaringClass().isAssignableFrom(_methodBody.getClass());
+            assert __activeJacobThread.get() == null;
+
+            if (__log.isTraceEnabled()) {
+                String dbgMsg = _cycle + ": " + _source;
+                __log.debug(dbgMsg);
+                _prefix = "     ===> ";
+            }
+
+            Object[] args;
+            SynchChannel synchChannel;
+            if (_method.getReturnType() != void.class) {
+                args = new Object[_args.length - 1];
+                System.arraycopy(_args, 0, args, 0, args.length);
+                synchChannel = (SynchChannel) _args[args.length];
+            } else {
+                args = _args;
+                synchChannel = null;
+            }
+            __activeJacobThread.set(this);
+            long ctime = System.currentTimeMillis();
+            try {
+                _method.invoke(_methodBody, args);
+                if (synchChannel != null) {
+                    synchChannel.ret();
+                }
+            } catch (IllegalAccessException iae) {
+                String msg = __msgs.msgMethodNotAccessible(_method.getName(),
+                        _method.getDeclaringClass().getName());
+                __log.error(msg, iae);
+                throw new RuntimeException(msg, iae);
+            } catch (InvocationTargetException e) {
+                String msg = __msgs.msgClientMethodException(_method.getName(),
+                        _methodBody.getClass().getName());
+                __log.error(msg, e.getTargetException());
+                throw new RuntimeException(e.getTargetException());
+            } finally {
+                ctime = System.currentTimeMillis() - ctime;
+                _statistics.totalClientTimeMs += ctime;
+                __activeJacobThread.set(null);
+                _prefix = null;
+            }
+
+            assert __activeJacobThread.get() == null;
+        }
+
+        public String toString() {
+            return "PT[ " + _methodBody + " ]";
+        }
     }
-  }
 
 }


Reply via email to