Copied: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java (from r426596, incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/Soup.java) URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java?p2=incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java&p1=incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/Soup.java&r1=426596&r2=426609&rev=426609&view=diff ============================================================================== --- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/Soup.java (original) +++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueue.java Fri Jul 28 09:39:45 2006 @@ -21,13 +21,13 @@ import java.io.PrintStream; /** - * The soup, the reactive "broth" that underlies the JACOB system. The [EMAIL PROTECTED] Soup} + * The soup, the reactive "broth" that underlies the JACOB system. The [EMAIL PROTECTED] ExecutionQueue} * implementation is responsible for implementing the JACOB reactive rules and * maintaining the state of the reactive broth. * * @author Maciej Szefler <a href="mailto:[EMAIL PROTECTED]">mbs</a> */ -public interface Soup { +public interface ExecutionQueue { /** * Are there any reactions that can be executed in the broth? @@ -37,14 +37,14 @@ boolean hasReactions(); /** - * Add a reaction to the broth. This operation is sometimes + * Add a continuation to the broth. This operation is sometimes * referred to as an "injection"; it can be used to inject into the - * broth the "original" reaction. - * @param reaction the [EMAIL PROTECTED] Reaction} to add to the broth + * broth the "original" continuation. + * @param continuation the [EMAIL PROTECTED] Continuation} to add to the broth */ - public void enqueueReaction(Reaction reaction); + public void enqueueReaction(Continuation continuation); - public Reaction dequeueReaction(); + public Continuation dequeueReaction(); public void add(CommChannel channel);
Copied: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java (from r426596, incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/SoupObject.java) URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java?p2=incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java&p1=incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/SoupObject.java&r1=426596&r2=426609&rev=426609&view=diff ============================================================================== --- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/SoupObject.java (original) +++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/soup/ExecutionQueueObject.java Fri Jul 28 09:39:45 2006 @@ -19,12 +19,12 @@ package org.apache.ode.jacob.soup; /** - * Base class for items we find in the [EMAIL PROTECTED] Soup}. + * Base class for items we find in the [EMAIL PROTECTED] ExecutionQueue}. * <p>Created on Feb 17, 2004 at 3:44:24 PM.</p> * * @author Maciej Szefler <a href="mailto:[EMAIL PROTECTED]">mbs</a> */ -public class SoupObject { +public class ExecutionQueueObject { /** A unique idefntifer for this object in the soup (should only be set by soup). */ private Object _id; @@ -53,8 +53,8 @@ } public boolean equals(Object obj) { - if (_id == null || ((SoupObject)obj)._id == null) + if (_id == null || ((ExecutionQueueObject)obj)._id == null) return this==obj; - return ((SoupObject)obj)._id.equals(_id); + return ((ExecutionQueueObject)obj)._id.equals(_id); } } Copied: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java (from r426596, incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/FastSoupImpl.java) URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java?p2=incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java&p1=incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/FastSoupImpl.java&r1=426596&r2=426609&rev=426609&view=diff ============================================================================== --- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/FastSoupImpl.java (original) +++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java Fri Jul 28 09:39:45 2006 @@ -18,45 +18,66 @@ */ package org.apache.ode.jacob.vpu; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.ode.jacob.Channel; -import org.apache.ode.jacob.JavaClosure; -import org.apache.ode.jacob.ML; -import org.apache.ode.jacob.soup.*; +import org.apache.ode.jacob.ChannelListener; +import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.soup.Comm; +import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.soup.CommGroup; +import org.apache.ode.jacob.soup.CommRecv; +import org.apache.ode.jacob.soup.CommSend; +import org.apache.ode.jacob.soup.Continuation; +import org.apache.ode.jacob.soup.ExecutionQueue; +import org.apache.ode.jacob.soup.ExecutionQueueObject; +import org.apache.ode.jacob.soup.ReplacementMap; import org.apache.ode.utils.ArrayUtils; import org.apache.ode.utils.ObjectPrinter; -import java.io.*; +import java.io.Externalizable; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInput; +import java.io.ObjectInputStream; +import java.io.ObjectOutput; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.io.OutputStream; +import java.io.PrintStream; +import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.Method; -import java.util.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** - * A fast, in-memory [EMAIL PROTECTED] org.apache.ode.jacob.soup.Soup} implementation. + * A fast, in-memory [EMAIL PROTECTED] org.apache.ode.jacob.soup.ExecutionQueue} implementation. */ -public class FastSoupImpl implements Soup { +public class ExecutionQueueImpl implements ExecutionQueue { /** Class-level logger. */ - private static final Log __log = LogFactory.getLog(FastSoupImpl.class); + private static final Log __log = LogFactory.getLog(ExecutionQueueImpl.class); private ClassLoader _classLoader; /** - * Cached set of enqueued [EMAIL PROTECTED] Reaction} objects (i.e. those reed using the - * [EMAIL PROTECTED] #enqueueReaction(org.apache.ode.jacob.soup.Reaction) method}). These reactions are + * Cached set of enqueued [EMAIL PROTECTED] Continuation} objects (i.e. those reed using the + * [EMAIL PROTECTED] #enqueueReaction(org.apache.ode.jacob.soup.Continuation) method}). These reactions are * "cached"--that is it is not sent directly to the DAO layer--to minimize * unnecessary serialization/deserialization of closures. This is a pretty useful - * optimization, as most [EMAIL PROTECTED] Reaction}s are enqueued, and then immediately - * dequeued in the next cycle. By caching [EMAIL PROTECTED] Reaction}s, we eliminate practically + * optimization, as most [EMAIL PROTECTED] Continuation}s are enqueued, and then immediately + * dequeued in the next cycle. By caching [EMAIL PROTECTED] Continuation}s, we eliminate practically * all serialization of these objects, the only exception being cases where the * system decides to stop processing a particular soup despite the soup being * able to make forward progress; this scenario would occur if a maximum processign * time-per-instance policy were in effect. */ - private Set<Reaction> _reactions = new HashSet<Reaction>(); + private Set<Continuation> _reactions = new HashSet<Continuation>(); private Map<Integer, ChannelFrame> _channels = new HashMap<Integer, ChannelFrame>(); /** The "expected" cycle counter, use to detect database serialization issues. */ @@ -64,11 +85,11 @@ private int _objIdCounter; - private SoupStatistics _statistics = new SoupStatistics(); + private ExecutionQueueStatistics _statistics = new ExecutionQueueStatistics(); private ReplacementMap _replacementMap; private Serializable _gdata; - public FastSoupImpl(ClassLoader classLoader) { + public ExecutionQueueImpl(ClassLoader classLoader) { _classLoader = classLoader; } @@ -91,32 +112,32 @@ } - public void enqueueReaction(Reaction reaction) { + public void enqueueReaction(Continuation continuation) { if (__log.isTraceEnabled()) - __log.trace(ObjectPrinter.stringifyMethodEnter("enqueueReaction", new Object[] { "reaction", reaction} )); + __log.trace(ObjectPrinter.stringifyMethodEnter("enqueueReaction", new Object[] { "continuation", continuation} )); - verifyNew(reaction); - _reactions.add(reaction); + verifyNew(continuation); + _reactions.add(continuation); } - public Reaction dequeueReaction() { + public Continuation dequeueReaction() { if (__log.isTraceEnabled()) { __log.trace(ObjectPrinter.stringifyMethodEnter("dequeueReaction", ArrayUtils.EMPTY_OBJECT_ARRAY)); } - Reaction reaction = null; + Continuation continuation = null; if (!_reactions.isEmpty()) { Iterator it = _reactions.iterator(); - reaction = (Reaction)it.next(); + continuation = (Continuation)it.next(); it.remove(); } - // At this point it is wise to clone the reaction, so that we do not have weird + // At this point it is wise to clone the continuation, so that we do not have weird // concurrency issues. We only clone the closure, the arguments should not be // a problem. -// Reaction clone = new Reaction(cloneClosure(reaction.getClosure()), reaction.getMethod(), reaction.getArgs()); -// clone.setDescription(reaction.getDescription()); +// Continuation clone = new Continuation(cloneClosure(continuation.getClosure()), continuation.getMethod(), continuation.getArgs()); +// clone.setDescription(continuation.getDescription()); // return clone; - return reaction; + return continuation; } public void add(CommGroup group) { @@ -213,13 +234,13 @@ _channels.clear(); _reactions.clear(); - SoupInputStream sis = new SoupInputStream(iis); + ExecutionQueueInputStream sis = new ExecutionQueueInputStream(iis); _objIdCounter = sis.readInt(); _currentCycle = sis.readInt(); int reactions = sis.readInt(); for (int i = 0; i < reactions; ++i) { - JavaClosure closure = (JavaClosure)sis.readObject(); + JacobObject closure = (JacobObject)sis.readObject(); String methodName = sis.readUTF(); Method method = closure.getMethod(methodName); int numArgs = sis.readInt(); @@ -227,7 +248,7 @@ for (int j = 0; j < numArgs; ++j) { args[j] = sis.readObject(); } - _reactions.add(new Reaction(closure, method, args)); + _reactions.add(new Continuation(closure, method, args)); } int numChannels = sis.readInt(); @@ -253,7 +274,7 @@ public void write(OutputStream oos) throws IOException { flush(); - SoupOutputStream sos = new SoupOutputStream(oos); + ExecutionQueueOutputStream sos = new ExecutionQueueOutputStream(oos); sos.writeInt(_objIdCounter); sos.writeInt(_currentCycle); @@ -261,12 +282,12 @@ // Write out the reactions. sos.writeInt(_reactions.size()); for (Iterator i = _reactions.iterator();i.hasNext(); ) { - Reaction reaction = (Reaction) i.next(); - sos.writeObject(reaction.getClosure()); - sos.writeUTF(reaction.getMethod().getName()); - sos.writeInt(reaction.getArgs() == null ? 0 : reaction.getArgs().length); - for (int j = 0; reaction.getArgs() != null && j < reaction.getArgs().length; ++j) - sos.writeObject(reaction.getArgs()[j]); + Continuation continuation = (Continuation) i.next(); + sos.writeObject(continuation.getClosure()); + sos.writeUTF(continuation.getMethod().getName()); + sos.writeInt(continuation.getArgs() == null ? 0 : continuation.getArgs().length); + for (int j = 0; continuation.getArgs() != null && j < continuation.getArgs().length; ++j) + sos.writeObject(continuation.getArgs()[j]); } sos.writeInt(_channels.values().size()); @@ -338,8 +359,8 @@ ps.println("-- REACTIONS"); int cnt =0; for (Iterator i = _reactions.iterator(); i.hasNext();) { - Reaction reaction = (Reaction) i.next(); - ps.println(" #" + (++cnt) + ": " + reaction.toString()); + Continuation continuation = (Continuation) i.next(); + ps.println(" #" + (++cnt) + ": " + continuation.toString()); } } } @@ -355,11 +376,11 @@ MessageFrame mframe = cframe.msgFrames.iterator().next(); ObjectFrame oframe = cframe.objFrames.iterator().next(); - Reaction reaction = new Reaction(oframe.continuation, oframe.continuation.getMethod(mframe.method), mframe.args); + Continuation continuation = new Continuation(oframe._continuation, oframe._continuation.getMethod(mframe.method), mframe.args); if(__log.isInfoEnabled()) { - reaction.setDescription(channel + " ? {...} | " + channel + " ! " + mframe.method + "(...)"); + continuation.setDescription(channel + " ? {...} | " + channel + " ! " + mframe.method + "(...)"); } - enqueueReaction(reaction); + enqueueReaction(continuation); if (!mframe.commGroupFrame.replicated) { removeCommGroup(mframe.commGroupFrame); } @@ -374,16 +395,16 @@ } // TODO revisit: apparently dead wood -// private JavaClosure cloneClosure(JavaClosure closure) { +// private JacobObject cloneClosure(JacobObject closure) { // long startTime = System.currentTimeMillis(); // try { // ByteArrayOutputStream bos = new ByteArrayOutputStream(10000); -// SoupOutputStream sos = new SoupOutputStream(bos); +// ExecutionQueueOutputStream sos = new ExecutionQueueOutputStream(bos); // sos.writeObject(closure); // sos.close(); // long readStart = System.currentTimeMillis(); -// SoupInputStream cis = new SoupInputStream(new ByteArrayInputStream(bos.toByteArray())); -// JavaClosure ret = (JavaClosure) cis.readObject(); +// ExecutionQueueInputStream cis = new ExecutionQueueInputStream(new ByteArrayInputStream(bos.toByteArray())); +// JacobObject ret = (JacobObject) cis.readObject(); // cis.close(); // // long copyTime = System.currentTimeMillis() - startTime; @@ -400,22 +421,22 @@ // } // return ret; // } catch (Exception ex) { -// throw new RuntimeException("Internal Error in FastSoupImpl.java", ex); +// throw new RuntimeException("Internal Error in ExecutionQueueImpl.java", ex); // } // } /** - * Verify that a [EMAIL PROTECTED] SoupObject} is new, that is it has not already been + * Verify that a [EMAIL PROTECTED] ExecutionQueueObject} is new, that is it has not already been * added to the soup. * @param so object to check. * @throws IllegalArgumentException in case the object is not new */ - private void verifyNew(SoupObject so) throws IllegalArgumentException { + private void verifyNew(ExecutionQueueObject so) throws IllegalArgumentException { if (so.getId() != null) throw new IllegalArgumentException("The object " + so + " is not new!"); } - private void assignId(SoupObject so, Object id) { + private void assignId(ExecutionQueueObject so, Object id) { so.setId(id); } @@ -561,23 +582,23 @@ } private static class ObjectFrame extends CommFrame implements Externalizable { - ML continuation; + ChannelListener _continuation; public ObjectFrame() { super() ; } - public ObjectFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, ML continuation) { + public ObjectFrame(CommGroupFrame commGroupFrame, ChannelFrame channelFrame, ChannelListener continuation) { super(commGroupFrame, channelFrame); - this.continuation = continuation; + this._continuation = continuation; } public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { super.readExternal(in); - continuation = (ML)in.readObject(); + _continuation = (ChannelListener)in.readObject(); } public void writeExternal(ObjectOutput out) throws IOException { super.writeExternal(out); - out.writeObject(continuation); + out.writeObject(_continuation); } } @@ -619,10 +640,10 @@ * * @author Maciej Szefler <a href="mailto:[EMAIL PROTECTED]">mbs</a> */ - private class SoupOutputStream extends ObjectOutputStream { + private class ExecutionQueueOutputStream extends ObjectOutputStream { private Set<Object> _serializedChannels = new HashSet<Object>(); - public SoupOutputStream(OutputStream outputStream) throws IOException { + public ExecutionQueueOutputStream(OutputStream outputStream) throws IOException { super(new GZIPOutputStream(outputStream)); enableReplaceObject(true); } @@ -670,10 +691,10 @@ /** */ - public class SoupInputStream extends ObjectInputStream { + public class ExecutionQueueInputStream extends ObjectInputStream { private Set<CommChannel> _deserializedChannels = new HashSet<CommChannel>(); - public SoupInputStream(InputStream in) throws IOException { + public ExecutionQueueInputStream(InputStream in) throws IOException { super(new GZIPInputStream(in)); enableResolveObject(true); } @@ -751,7 +772,7 @@ } - private static final class SoupStatistics { + private static final class ExecutionQueueStatistics { public long cloneClosureTimeMs; public long cloneClosureBytes; public long cloneClousreCount; Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java?rev=426609&r1=426608&r2=426609&view=diff ============================================================================== --- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java (original) +++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobMessages.java Fri Jul 28 09:39:45 2006 @@ -50,13 +50,13 @@ } /** - * Error indicating that a re-hydration of a saved continuation object could + * Error indicating that a re-hydration of a saved _continuation object could * not be completed. * * @param channel - * channel with the dangling continuation + * channel with the dangling _continuation * @param mlClassName - * name of de-hydrated [EMAIL PROTECTED] org.apache.ode.jacob.ML} object + * name of de-hydrated [EMAIL PROTECTED] org.apache.ode.jacob.ChannelListener} object * */ public String msgContHydrationErr(String channel, String mlClassName) { Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java?rev=426609&r1=426608&r2=426609&view=diff ============================================================================== --- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java (original) +++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java Fri Jul 28 09:39:45 2006 @@ -18,8 +18,20 @@ */ package org.apache.ode.jacob.vpu; -import org.apache.ode.jacob.*; -import org.apache.ode.jacob.soup.*; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.jacob.Channel; +import org.apache.ode.jacob.ChannelListener; +import org.apache.ode.jacob.JacobObject; +import org.apache.ode.jacob.JacobRunnable; +import org.apache.ode.jacob.JacobThread; +import org.apache.ode.jacob.SynchChannel; +import org.apache.ode.jacob.soup.CommChannel; +import org.apache.ode.jacob.soup.CommGroup; +import org.apache.ode.jacob.soup.CommRecv; +import org.apache.ode.jacob.soup.CommSend; +import org.apache.ode.jacob.soup.Continuation; +import org.apache.ode.jacob.soup.ExecutionQueue; import org.apache.ode.utils.ArrayUtils; import org.apache.ode.utils.ObjectPrinter; import org.apache.ode.utils.msg.MessageBundle; @@ -29,9 +41,6 @@ import java.util.HashMap; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * The JACOB Virtual Processing Unit ("VPU"). @@ -50,12 +59,12 @@ private static final Method REDUCE_METHOD; - /** Pre-fetch the [EMAIL PROTECTED] Abstraction#self} method */ + /** Pre-fetch the [EMAIL PROTECTED] JacobRunnable#self} method */ static { Method rm = null; try { - rm = Abstraction.class.getMethod("self", ArrayUtils.EMPTY_CLASS_ARRAY); + rm = JacobRunnable.class.getMethod("self", ArrayUtils.EMPTY_CLASS_ARRAY); } catch (Exception e) { e.printStackTrace(); } @@ -64,7 +73,7 @@ } /** Persisted cross-VPU state (state of the channels) */ - private Soup _soup; + private ExecutionQueue _executionQueue; private Map<Class, Object> _extensions = new HashMap<Class, Object>(); @@ -88,11 +97,11 @@ /** * Re-hydration constructor. - * @param soup previously saved execution context + * @param executionQueue previously saved execution context */ - public JacobVPU(Soup soup) { + public JacobVPU(ExecutionQueue executionQueue) { this(); - setContext(soup); + setContext(executionQueue); } /** @@ -101,7 +110,7 @@ * @param context virgin context object * @param concretion the process */ - public JacobVPU(Soup context, Abstraction concretion) { + public JacobVPU(ExecutionQueue context, JacobRunnable concretion) { setContext(context); inject(concretion); } @@ -115,20 +124,20 @@ if (__log.isTraceEnabled()) __log.trace(ObjectPrinter.stringifyMethodEnter("execute", ArrayUtils.EMPTY_OBJECT_ARRAY)); - if (_soup == null) + if (_executionQueue == null) throw new IllegalStateException("No state object for VPU!"); if (_fault != null) { throw _fault; } - if (!_soup.hasReactions()) { + if (!_executionQueue.hasReactions()) { return false; } - _cycle = _soup.cycle(); + _cycle = _executionQueue.cycle(); - Reaction rqe = _soup.dequeueReaction(); + Continuation rqe = _executionQueue.dequeueReaction(); JacobThreadImpl jt = new JacobThreadImpl(rqe); long ctime = System.currentTimeMillis(); @@ -149,7 +158,7 @@ public void flush() { if (__log.isTraceEnabled()) __log.trace(ObjectPrinter.stringifyMethodEnter("flush", ArrayUtils.EMPTY_OBJECT_ARRAY)); - _soup.flush(); + _executionQueue.flush(); } /** @@ -163,13 +172,13 @@ /** * Set the state of of the VPU; this is analagous to loading a CPU with * a thread's context (re-hydration). - * @param soup process soup (state) + * @param executionQueue process executionQueue (state) */ - public void setContext(Soup soup) { + public void setContext(ExecutionQueue executionQueue) { if (__log.isTraceEnabled()) - __log.trace(ObjectPrinter.stringifyMethodEnter("setContext", new Object[] {"soupDao", soup} )); - _soup = soup; - _soup.setClassLoader(_classLoader); + __log.trace(ObjectPrinter.stringifyMethodEnter("setContext", new Object[] {"soupDao", executionQueue} )); + _executionQueue = executionQueue; + _executionQueue.setClassLoader(_classLoader); } @@ -186,7 +195,7 @@ /** * Add an item to the run queue. */ - public void addReaction(JavaClosure jmb, Method method, Object[] args, String desc) { + public void addReaction(JacobObject jmb, Method method, Object[] args, String desc) { if (__log.isTraceEnabled()) __log.trace(ObjectPrinter.stringifyMethodEnter("addReaction", new Object[] { "jmb", jmb, @@ -195,9 +204,9 @@ "desc", desc })); - Reaction reaction = new Reaction(jmb, method, args); - reaction.setDescription(desc); - _soup.enqueueReaction(reaction); + Continuation continuation = new Continuation(jmb, method, args); + continuation.setDescription(desc); + _executionQueue.enqueueReaction(continuation); ++_statistics.runQueueEntries; } @@ -224,7 +233,7 @@ * of an active [EMAIL PROTECTED] JacobThread}. * @param concretion the concretion to inject into the process context */ - public void inject(Abstraction concretion) { + public void inject(JacobRunnable concretion) { if (__log.isTraceEnabled()) __log.trace(ObjectPrinter.stringifyMethodEnter("inject", new Object[] { "concretion", concretion })); @@ -290,8 +299,8 @@ public void setClassLoader(ClassLoader classLoader) { _classLoader = classLoader; - if (_soup != null) - _soup.setClassLoader(classLoader); + if (_executionQueue != null) + _executionQueue.setClassLoader(classLoader); } @@ -300,16 +309,16 @@ */ public void dumpState() { _statistics.printToStream(System.err); - _soup.dumpState(System.err); + _executionQueue.dumpState(System.err); } public boolean isComplete() { - return _soup.isComplete(); + return _executionQueue.isComplete(); } private class JacobThreadImpl implements Runnable, JacobThread { - private final JavaClosure _methodBody; + private final JacobObject _methodBody; private final Object[] _args; private final Method _method; private String _prefix; @@ -320,7 +329,7 @@ /** Text string identifying the target class and method (for debug) .*/ private String _targetStr = "Unknown"; - JacobThreadImpl(Reaction rqe) { + JacobThreadImpl(Continuation rqe) { assert rqe != null; _methodBody = rqe.getClosure(); @@ -337,7 +346,7 @@ } - public void instance(Abstraction template) { + public void instance(JacobRunnable template) { String desc = null; if (__log.isDebugEnabled()){ __log.debug(_cycle + ": " + _prefix + template); @@ -360,7 +369,7 @@ // Check for synchronous methods; create a synchronization channel if (method.getReturnType() != void.class) { if (method.getReturnType() != SynchChannel.class) - throw new IllegalStateException("ML method can only return SynchChannel: " + method); + throw new IllegalStateException("ChannelListener method can only return SynchChannel: " + method); replyChannel = (SynchChannel) newChannel(SynchChannel.class,"", "Reply Channel"); Object[] newArgs = new Object[args.length + 1]; System.arraycopy(args, 0, newArgs, 0, args.length); @@ -372,7 +381,7 @@ CommSend send = new CommSend(chnl, method, args); grp.add(send); - _soup.add(grp); + _executionQueue.add(grp); return replyChannel; } @@ -381,7 +390,7 @@ String description) { CommChannel chnl = new CommChannel(channelType); chnl.setDescription(description); - _soup.add(chnl); + _executionQueue.add(chnl); // Some of the debug information is a bit lengthy... //cframe.setDebugInfo(fillDebugInfo()); @@ -400,12 +409,12 @@ __log.debug(_cycle + ": " + _prefix + "export<" + channel + ">"); CommChannel chnl = (CommChannel)ChannelFactory.getBackend(channel); - return _soup.createExport(chnl); + return _executionQueue.createExport(chnl); } public Channel importChannel(String channelId, Class channelType) { try { - CommChannel cframe = _soup.consumeExport(channelId); + CommChannel cframe = _executionQueue.consumeExport(channelId); return ChannelFactory.createChannel(cframe, channelType); } catch (RuntimeException re) { throw re; @@ -415,7 +424,7 @@ /** * @see JacobThread#object */ - public void object(boolean replicate, ML[] ml) { + public void object(boolean replicate, ChannelListener[] ml) { if (__log.isDebugEnabled()) { StringBuffer msg = new StringBuffer(); msg.append(_cycle); @@ -443,12 +452,12 @@ grp.add(recv); } - _soup.add(grp); + _executionQueue.add(grp); } - public void object(boolean replicate, ML methodList) throws IllegalArgumentException { - object(replicate, new ML[] { methodList } ); + public void object(boolean replicate, ChannelListener methodList) throws IllegalArgumentException { + object(replicate, new ChannelListener[] { methodList } ); } // private DebugInfo fillDebugInfo() { Modified: incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/Statistics.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/Statistics.java?rev=426609&r1=426608&r2=426609&view=diff ============================================================================== --- incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/Statistics.java (original) +++ incubator/ode/trunk/jacob/src/main/java/org/apache/ode/jacob/vpu/Statistics.java Fri Jul 28 09:39:45 2006 @@ -48,7 +48,7 @@ /** Number of continuations. */ public long numContinuations; - /** Total size of all continuation (in bytes). */ + /** Total size of all _continuation (in bytes). */ public long totalContinuationBytes; /** Number of enqueues to the run queue. */ Modified: incubator/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/SynchTest.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/SynchTest.java?rev=426609&r1=426608&r2=426609&view=diff ============================================================================== --- incubator/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/SynchTest.java (original) +++ incubator/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/SynchTest.java Fri Jul 28 09:39:45 2006 @@ -29,7 +29,7 @@ public class SynchTest extends TestCase { // TODO still needed? apparently unused.. - private static final class P extends Abstraction { + private static final class P extends JacobRunnable { public void self() { } } Modified: incubator/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/cell/JacobCellTest.java URL: http://svn.apache.org/viewvc/incubator/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/cell/JacobCellTest.java?rev=426609&r1=426608&r2=426609&view=diff ============================================================================== --- incubator/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/cell/JacobCellTest.java (original) +++ incubator/ode/trunk/jacob/src/test/java/org/apache/ode/jacob/examples/cell/JacobCellTest.java Fri Jul 28 09:39:45 2006 @@ -18,19 +18,16 @@ */ package org.apache.ode.jacob.examples.cell; -import java.io.ByteArrayOutputStream; -import java.io.IOException; - import junit.framework.TestCase; - -import org.apache.ode.jacob.Abstraction; +import org.apache.ode.jacob.JacobRunnable; import org.apache.ode.jacob.ValChannel; -import org.apache.ode.jacob.ValML; -import org.apache.ode.jacob.examples.cell.CELL_; -import org.apache.ode.jacob.examples.cell.CellChannel; -import org.apache.ode.jacob.vpu.FastSoupImpl; +import org.apache.ode.jacob.ValChannelListener; +import org.apache.ode.jacob.vpu.ExecutionQueueImpl; import org.apache.ode.jacob.vpu.JacobVPU; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + public class JacobCellTest extends TestCase { private static Object _val; @@ -39,7 +36,7 @@ } public void testJacobCell1() throws IOException { - FastSoupImpl fsoup = new FastSoupImpl(null); + ExecutionQueueImpl fsoup = new ExecutionQueueImpl(null); JacobVPU vpu = new JacobVPU(fsoup, new CellTest1()); @@ -56,13 +53,13 @@ assertEquals("foo", _val); } - static class CellTest1 extends Abstraction { + static class CellTest1 extends JacobRunnable { public void self() { CellChannel cellChannel = newChannel(CellChannel.class, "cell"); ValChannel retChannel = newChannel(ValChannel.class, "val"); instance(new CELL_<String>(cellChannel, "foo")); - object(new ValML(retChannel) { + object(new ValChannelListener(retChannel) { public void val(Object retVal) { _val = retVal; } @@ -71,7 +68,7 @@ } } - private static class Compute extends Abstraction { + private static class Compute extends JacobRunnable { ValChannel _out; int _x; @@ -87,7 +84,7 @@ } // TODO still needed? - private static class Foo extends Abstraction { + private static class Foo extends JacobRunnable { public void self() { ValChannel print = newChannel(ValChannel.class); instance(new Compute(1, print)); @@ -97,7 +94,7 @@ } } - private static class Print extends Abstraction { + private static class Print extends JacobRunnable { ValChannel _val; public Print(ValChannel val) { @@ -105,7 +102,7 @@ } public void self() { - object(new ValML(_val) { + object(new ValChannelListener(_val) { public void val(Object retVal) { System.out.println(retVal); }
