Hi All,
I've been hacking away at the PN domain, trying to get it to do non- deterministic merging correctly. Though it is not in the theory of PN, it's a nice property to have in practice. I've created a patch (attached) against PNDirector.java and PNQueueReceiver.java (the CVS versions) that works for my test cases. Given that this is a hard problem, my patch probably doesn't address all the potential issues. In particular, I'm not sure it works correctly when another director is embedded in PN, which is, I believe, when the get and put methods are called with a branch argument. I've also reattached a workflow I've sent before that demonstrates that execution does not finish correctly in PN with a non-deterministic merge, which is fixed by this patch. I'd appreciate it if people could look over this patch and send comments. :) Thanks! Xiaowen P.S. If you got this email twice, sorry for the repeat. Am Dienstag, den 15.02.2005, 21:17 -0800 schrieb Edward A. Lee: > Xiaowen: > > In your example, you have multiple writers to one channel. > Strictly speaking, this is not OK in PN. In particular, the Display > actor is designed to take multiple inputs from multiple channels. > Probably PN should detect this situation and flag an error. > > However, there is a more interesting issue that is exposed by this. > Your diagnosis, in fact, is exactly on target. > The design of PN assumes: > > - one thread per actor > - one actor writing to each each channel. > > If you violate these assumptions, then both the deadlock > detection and bounded buffer scheduling ("Parks' algorithm") > fail. However, violating these assumptions can be > very useful sometimes... In particular, we have created > a "NondeterministicMerge" actor for PN that violates these > assumption... This actor significantly (and dangerously) > extends PN semantics... But it is often useful. > > We have been working on this issue, and have some ideas, but > no concrete solutions. In particular, we have some simple > test cases where we implement a nondeterministic merge > in PN using multiple threads, and everything works except > that deadlock detection can fail, and the bounded buffer > scheduling scheme can interfere with deadlock detection and > stop models prematurely. > > Unfortunately, I haven't really been successful at getting > anyone in the Ptolemy group interested enough to actually fix > the problem... I have some ideas, but regrettably, almost no > time these days... Others in the group have contributed ideas, > but not code... I would be happy to work with anyone > who wants to take this on. There is likely a paper in the solution, > as the whole question of deadlock detection and bounded buffering > has a considerable literature. > > Edward > > At 02:35 PM 2/10/2005 -0800, xiaowen wrote: > >Hi, > > > > > >I think I see where the problem is. > > > >A single PNQueueReceiver may have multiple actors/threads trying to call > >its put() method--this is the case when there's a relation connecting the > >output of one actor to the input ports of multiple actors. If the size of > >the queue is not big enough, two threads may both block on the write and > >inform PNDirector that the queue is write blocked. This leads the > >director to believe that two different queues are blocked instead of > >one. Later, when the capacity of the queue is increased or a token is > >read and removed from it, the number of write-blocked queues is only > >decremented once by the director. Thus the workflow never terminates > >because it always thinks there's a write-blocked queue. > > > >Here's a diff against PNQueueReceiver.java in Ptolemy II CVS that seems to > >fix the problem: > > > >$ diff ~/work/ptII/ptolemy/domains/pn/kernel/PNQueueReceiver.java > >PNQueueReceiver.java > >420,421c420,423 > >< _writeBlocked = true; > >< prepareToBlock(branch); > >--- > > > if (false == _writeBlocked) { > > > _writeBlocked = true; > > > prepareToBlock(branch); > > > } > > > > > >Perhaps the get() method also requires something similar. With this > >change, the queue only informs the director that it's blocking on write > >once. It seems to make the workflow I sent earlier work correctly. If > >you agree this patch works, then please apply to ptII CVS. > > > > > >Thanks, > >Xiaowen > > > > > > > >On 08.02.2005, at 19:53, xiaowen wrote: > > > >>Hi Everyone, > >> > >> > >>Attached please find a small workflow that doesn't appear to finish > >>correctly. It is meant to write a short string to some files and > >>immediately finish. However, it seems to stay in the "executing" state > >>and never switches to the "execution finished" state. Will you please > >>take a look and tell me what I'm missing? > >> > >> > >>Thanks! > >>Xiaowen > >> > >> > >> > >> > >><execute.xml> > > > > > >---------------------------------------------------------------------------- > >Posted to the ptolemy-hackers mailing list. Please send administrative > >mail for this list to: [EMAIL PROTECTED] > > ------------ > Edward A. Lee > Professor, Chair of the EE Division, Associate Chair of EECS > 231 Cory Hall, UC Berkeley, Berkeley, CA 94720 > phone: 510-642-0253 or 510-642-0455, fax: 510-642-2845 > [EMAIL PROTECTED], http://ptolemy.eecs.berkeley.edu/~eal > > > ---------------------------------------------------------------------------- > Posted to the ptolemy-hackers mailing list. Please send administrative > mail for this list to: [EMAIL PROTECTED]
<?xml version="1.0" standalone="no"?> <!DOCTYPE entity PUBLIC "-//UC Berkeley//DTD MoML 1//EN" "http://ptolemy.eecs.berkeley.edu/xml/dtd/MoML_1.dtd"> <entity name="TwoRampBlock" class="ptolemy.actor.TypedCompositeActor"> <property name="_createdBy" class="ptolemy.kernel.attributes.VersionAttribute" value="4.0.1"> </property> <property name="PN Director" class="ptolemy.domains.pn.kernel.PNDirector"> <property name="_location" class="ptolemy.kernel.util.Location" value="[95.0, 50.0]"> </property> </property> <property name="_windowProperties" class="ptolemy.actor.gui.WindowPropertiesAttribute" value="{bounds={25, 7, 774, 530}, maximized=false}"> </property> <property name="_vergilSize" class="ptolemy.actor.gui.SizeAttribute" value="[559, 403]"> </property> <property name="_vergilZoomFactor" class="ptolemy.data.expr.ExpertParameter" value="1.0"> </property> <property name="_vergilCenter" class="ptolemy.data.expr.ExpertParameter" value="{279.5, 201.5}"> </property> <entity name="Ramp" class="ptolemy.actor.lib.Ramp"> <property name="firingCountLimit" class="ptolemy.data.expr.Parameter" value="1"> </property> <doc>Create a sequence of tokens with increasing value</doc> <property name="_location" class="ptolemy.kernel.util.Location" value="[60.0, 165.0]"> </property> </entity> <entity name="Ramp2" class="ptolemy.actor.lib.Ramp"> <property name="firingCountLimit" class="ptolemy.data.expr.Parameter" value="1"> </property> <doc>Create a sequence of tokens with increasing value</doc> <property name="_location" class="ptolemy.kernel.util.Location" value="[445.0, 175.0]"> </property> </entity> <entity name="Expression2" class="ptolemy.actor.lib.Expression"> <property name="expression" class="ptolemy.kernel.util.StringAttribute" value="input"> </property> <property name="_icon" class="ptolemy.vergil.icon.BoxedValueIcon"> <property name="attributeName" class="ptolemy.kernel.util.StringAttribute" value="expression"> </property> <property name="displayWidth" class="ptolemy.data.expr.Parameter" value="60"> </property> </property> <property name="_location" class="ptolemy.kernel.util.Location" value="[285.0, 165.0]"> </property> <port name="input" class="ptolemy.actor.TypedIOPort"> <property name="input"/> <property name="_type" class="ptolemy.actor.TypeAttribute" value="unknown"> </property> </port> </entity> <entity name="Expression3" class="ptolemy.actor.lib.Expression"> <property name="expression" class="ptolemy.kernel.util.StringAttribute" value="input"> </property> <property name="_icon" class="ptolemy.vergil.icon.BoxedValueIcon"> <property name="attributeName" class="ptolemy.kernel.util.StringAttribute" value="expression"> </property> <property name="displayWidth" class="ptolemy.data.expr.Parameter" value="60"> </property> </property> <property name="_location" class="ptolemy.kernel.util.Location" value="[280.0, 330.0]"> </property> <port name="input" class="ptolemy.actor.TypedIOPort"> <property name="input"/> <property name="_type" class="ptolemy.actor.TypeAttribute" value="unknown"> </property> </port> </entity> <entity name="Expression4" class="ptolemy.actor.lib.Expression"> <property name="expression" class="ptolemy.kernel.util.StringAttribute" value="input"> </property> <property name="_icon" class="ptolemy.vergil.icon.BoxedValueIcon"> <property name="attributeName" class="ptolemy.kernel.util.StringAttribute" value="expression"> </property> <property name="displayWidth" class="ptolemy.data.expr.Parameter" value="60"> </property> </property> <property name="_location" class="ptolemy.kernel.util.Location" value="[285.0, 270.0]"> </property> <port name="input" class="ptolemy.actor.TypedIOPort"> <property name="input"/> <property name="_type" class="ptolemy.actor.TypeAttribute" value="unknown"> </property> </port> </entity> <entity name="Expression5" class="ptolemy.actor.lib.Expression"> <property name="expression" class="ptolemy.kernel.util.StringAttribute" value="input"> </property> <property name="_icon" class="ptolemy.vergil.icon.BoxedValueIcon"> <property name="attributeName" class="ptolemy.kernel.util.StringAttribute" value="expression"> </property> <property name="displayWidth" class="ptolemy.data.expr.Parameter" value="60"> </property> </property> <property name="_location" class="ptolemy.kernel.util.Location" value="[285.0, 215.0]"> </property> <port name="input" class="ptolemy.actor.TypedIOPort"> <property name="input"/> <property name="_type" class="ptolemy.actor.TypeAttribute" value="unknown"> </property> </port> </entity> <relation name="relation" class="ptolemy.actor.TypedIORelation"> <vertex name="vertex1" value="[140.0, 165.0]"> </vertex> </relation> <relation name="relation3" class="ptolemy.actor.TypedIORelation"> <vertex name="vertex1" value="[370.0, 165.0]"> </vertex> </relation> <link port="Ramp.output" relation="relation"/> <link port="Ramp2.trigger" relation="relation3"/> <link port="Expression2.output" relation="relation3"/> <link port="Expression2.input" relation="relation"/> <link port="Expression3.output" relation="relation3"/> <link port="Expression3.input" relation="relation"/> <link port="Expression4.output" relation="relation3"/> <link port="Expression4.input" relation="relation"/> <link port="Expression5.output" relation="relation3"/> <link port="Expression5.input" relation="relation"/> </entity>
diff -ur kernel.old/PNDirector.java kernel/PNDirector.java --- kernel.old/PNDirector.java 2005-03-31 15:49:33.000000000 -0800 +++ kernel/PNDirector.java 2005-03-31 16:23:39.000000000 -0800 @@ -431,12 +431,8 @@ + smallestCapacityQueue.getCapacity()); } - _actorUnBlocked(smallestCapacityQueue); - smallestCapacityQueue.setWritePending(false); - - synchronized (smallestCapacityQueue) { - smallestCapacityQueue.notifyAll(); - } + smallestCapacityQueue.setWritePending(); + this.notifyAll(); return; } @@ -447,14 +443,13 @@ * of the process blocking on a read. If either of them is detected, * then notify the directing thread of the same. */ - protected synchronized void _actorBlocked(ProcessReceiver receiver) { - if (receiver.isReadBlocked()) { + protected synchronized void _actorBlocked( + PNQueueReceiver receiver, boolean readBlocked) { + if(readBlocked) { _readBlockCount++; - } - - if (receiver.isWriteBlocked()) { - _writeBlockedQueues.add(receiver); + } else { _writeBlockCount++; + _writeBlockedQueues.add(receiver); } super._actorBlocked(receiver); @@ -465,19 +460,18 @@ * the process listeners that the relevant process has resumed its * execution. * @param receiver The receiver that is unblocked. + * @param readBlocked Whether this is read or write blocked. */ - protected synchronized void _actorUnBlocked(PNQueueReceiver receiver) { - if (receiver.isReadBlocked()) { + protected synchronized void _actorUnBlocked( + PNQueueReceiver receiver, boolean readBlocked) { + if(readBlocked) { _readBlockCount--; - } - - if (receiver.isWriteBlocked()) { + } else { _writeBlockCount--; _writeBlockedQueues.remove(receiver); } super._actorUnBlocked(receiver); - return; } /** Resolve an artificial deadlock and return true. If the @@ -555,8 +549,8 @@ /////////////////////////////////////////////////////////////////// //// private variables //// - private LinkedList _processListeners = new LinkedList(); + private LinkedList _processListeners = new LinkedList(); /** The list of all receivers that this director has created. */ private LinkedList _receivers = new LinkedList(); } diff -ur kernel.old/PNQueueReceiver.java kernel/PNQueueReceiver.java --- kernel.old/PNQueueReceiver.java 2005-03-31 15:49:33.000000000 -0800 +++ kernel/PNQueueReceiver.java 2005-03-31 16:30:31.000000000 -0800 @@ -101,27 +101,6 @@ /////////////////////////////////////////////////////////////////// //// public methods //// - /** Prepare to register a block. If the branch object specified as - * a parameter is non-null then register the block with the branch. - * If the branch object specified as a parameter is null then - * register the block with the local director. - * @param branch The Branch managing execution of this method. - */ - public void prepareToBlock(Branch branch) { - // NOTE: This used to be synchronized on this object, but - // since it calls director methods that are synchronized, - // that would cause deadlock. - synchronized (_director) { - if (branch != null) { - branch.registerReceiverBlocked(this); - _otherBranch = branch; - } else { - _director._actorBlocked(this); - _otherBranch = branch; - } - } - } - /** Set the container. This overrides the base class to record * the director. * @param port The container. @@ -156,27 +135,6 @@ _director = (PNDirector) director; } - /** Unblock this receiver and register this new state with - * either the monitoring branch or the local director. If - * there is no blocked branch waiting, then register the - * new state with the local director; otherwise, register - * the new state with the blocked branch. - */ - public void wakeUpBlockedPartner() { - // NOTE: This method used to be synchronized on this - // receiver, but since it calls synchronized methods in - // the director, that can cause deadlock. - synchronized (_director) { - if (_otherBranch != null) { - _otherBranch.registerReceiverUnBlocked(this); - } else { - _director._actorUnBlocked(this); - } - - _director.notifyAll(); - } - } - /** Get a token from this receiver. If the receiver is empty then * block until a token becomes available. Use the local director * to manage blocking reads that occur. If this receiver is @@ -202,45 +160,58 @@ * receiver. If a process is indeed blocked, then unblock the * process, and inform the director of the same. * Otherwise return. - * @param branch The Branch managin the execution of this method. + * @param branch The Branch managing the execution of this method. * @return The oldest Token read from the queue */ public Token get(Branch branch) { - Workspace workspace = getContainer().workspace(); + Token result = null; // NOTE: This method used to be synchronized on this // receiver, but since it calls synchronized methods in // the director, that can cause deadlock. - synchronized (_director) { - while (!_terminate && !super.hasToken()) { - _readBlocked = true; - prepareToBlock(branch); - - while (_readBlocked && !_terminate) { - try { - workspace.wait(_director); - } catch (InterruptedException e) { - _terminate = true; - break; - } - } + synchronized(_director) { + // OK, I'm blocked on read. + if(null == branch) { + _director._actorBlocked(this, true); + _readNumBlocked++; + } else { + branch.registerReceiverBlocked(this); } - if (_terminate) { - throw new TerminateProcessException(""); - } else { - result = super.get(); + while(!_terminate) { + // Try to read. + if(super.hasToken()) { + result = super.get(); + if(null == branch) { + if(_readPending) { + _readPending = false; + } else { + _readNumBlocked--; + _director._actorUnBlocked(this, true); + } + } else { + branch.registerReceiverUnBlocked(this); + } + _director.notifyAll(); + setWritePending(); + break; + } - //Check if pending write to the Queue; - if (_writeBlocked) { - wakeUpBlockedPartner(); - _writeBlocked = false; + // Wait to try again. + try { + _director.wait(); + } catch(InterruptedException e) { + _terminate = true; } } - return result; + if(_terminate) { + throw new TerminateProcessException(""); + } } + + return result; } /** Return true, since a channel in the Kahn process networks @@ -379,7 +350,7 @@ // receiver, but since it is called by synchronized methods in // the director, that can cause deadlock. synchronized (_director) { - return _readBlocked; + return 0 < _readNumBlocked; } } @@ -393,7 +364,7 @@ // receiver, but since it is called by synchronized methods in // the director, that can cause deadlock. synchronized (_director) { - return _writeBlocked; + return 0 < _writeNumBlocked; } } @@ -427,37 +398,48 @@ * @param token The token to be put in the receiver. */ public void put(Token token, Branch branch) { - Workspace workspace = getContainer().workspace(); // NOTE: This used to synchronize on this, but since it calls // director methods that are synchronized on the director, // this can cause deadlock. - synchronized (_director) { - while (!_terminate && !super.hasRoom()) { - _writeBlocked = true; - prepareToBlock(branch); - - while (_writeBlocked && !_terminate) { - try { - workspace.wait(_director); - } catch (InterruptedException e) { - _terminate = true; - break; + synchronized(_director) { + // OK, I'm blocked on write. + if(null == branch) { + _director._actorBlocked(this, false); + _writeNumBlocked++; + } else { + branch.registerReceiverBlocked(this); + } + + while(!_terminate) { + // Try to write. + if(super.hasRoom()) { + super.put(token); + if(null == branch) { + if(_writePending) { + _writePending = false; + } else { + _writeNumBlocked--; + _director._actorUnBlocked(this, false); + } + } else { + branch.registerReceiverUnBlocked(this); } + _director.notifyAll(); + setReadPending(); + break; + } + + // Wait to try again. + try { + _director.wait(); + } catch(InterruptedException e) { + _terminate = true; } } if (_terminate) { throw new TerminateProcessException(""); - } else { - //token can be put in the queue; - super.put(token); - - //Check if pending write to the Queue; - if (_readBlocked) { - wakeUpBlockedPartner(); - _readBlocked = false; - } } } } @@ -465,8 +447,10 @@ /** Reset the state variables in the receiver. */ public void reset() { - _readBlocked = false; - _writeBlocked = false; + _readPending = false; + _writePending = false; + _readNumBlocked = 0; + _writeNumBlocked = 0; _terminate = false; _boundaryDetector.reset(); } @@ -476,12 +460,16 @@ * @param readPending true if the calling process is blocking on a * read, false otherwise. */ - public void setReadPending(boolean readPending) { + public void setReadPending() { // NOTE: This method used to be synchronized on this // receiver, but since it calls synchronized methods in // the director, that can cause deadlock. synchronized (_director) { - _readBlocked = readPending; + if(!_readPending && 0 < _readNumBlocked) { + _readPending = true; + _readNumBlocked--; + _director._actorUnBlocked(this, true); + } } } @@ -490,12 +478,16 @@ * @param writePending true if the calling process is blocking on * a write, false otherwise. */ - public void setWritePending(boolean writePending) { + public void setWritePending() { // NOTE: This method used to be synchronized on this // receiver, but since it calls synchronized methods in // the director, that can cause deadlock. synchronized (_director) { - _writeBlocked = writePending; + if(!_writePending && 0 < _writeNumBlocked) { + _writePending = true; + _writeNumBlocked--; + _director._actorUnBlocked(this, false); + } } } @@ -518,9 +510,14 @@ /** The director in charge of this receiver. */ private PNDirector _director; - private boolean _readBlocked = false; - private boolean _writeBlocked = false; + + private boolean _readPending = false; + private boolean _writePending = false; + private int _readNumBlocked = 0; + private int _writeNumBlocked = 0; + private boolean _terminate = false; + private Branch _otherBranch = null; private BoundaryDetector _boundaryDetector; }