Hi All,

I've been hacking away at the PN domain, trying to get it to do non-
deterministic merging correctly.  Though it is not in the theory of PN,
it's a nice property to have in practice.  I've created a patch
(attached) against PNDirector.java and PNQueueReceiver.java (the CVS
versions) that works for my test cases.

Given that this is a hard problem, my patch probably doesn't address all
the potential issues.  In particular, I'm not sure it works correctly
when another director is embedded in PN, which is, I believe, when the
get and put methods are called with a branch argument.

I've also reattached a workflow I've sent before that demonstrates that
execution does not finish correctly in PN with a non-deterministic
merge, which is fixed by this patch.

I'd appreciate it if people could look over this patch and send
comments. :)


Thanks!
Xiaowen


P.S. If you got this email twice, sorry for the repeat.


Am Dienstag, den 15.02.2005, 21:17 -0800 schrieb Edward A. Lee:
> Xiaowen:
> 
> In your example, you have multiple writers to one channel.
> Strictly speaking, this is not OK in PN.  In particular, the Display
> actor is designed to take multiple inputs from multiple channels.
> Probably PN should detect this situation and flag an error.
> 
> However, there is a more interesting issue that is exposed by this.
> Your diagnosis, in fact, is exactly on target.
> The design of PN assumes:
> 
>    - one thread per actor
>    - one actor writing to each each channel.
> 
> If you violate these assumptions, then both the deadlock
> detection and bounded buffer scheduling ("Parks' algorithm")
> fail. However, violating these assumptions can be
> very useful sometimes...  In particular, we have created
> a "NondeterministicMerge" actor for PN that violates these
> assumption...  This actor significantly (and dangerously)
> extends PN semantics... But it is often useful.
> 
> We have been working on this issue, and have some ideas, but
> no concrete solutions.  In particular, we have some simple
> test cases where we implement a nondeterministic merge
> in PN using multiple threads, and everything works except
> that deadlock detection can fail, and the bounded buffer
> scheduling scheme can interfere with deadlock detection and
> stop models prematurely.
> 
> Unfortunately, I haven't really been successful at getting
> anyone in the Ptolemy group interested enough to actually fix
> the problem...  I have some ideas, but regrettably, almost no
> time these days...  Others in the group have contributed ideas,
> but not code... I would be happy to work with anyone
> who wants to take this on.  There is likely a paper in the solution,
> as the whole question of deadlock detection and bounded buffering
> has a considerable literature.
> 
> Edward
> 
> At 02:35 PM 2/10/2005 -0800, xiaowen wrote:
> >Hi,
> >
> >
> >I think I see where the problem is.
> >
> >A single PNQueueReceiver may have multiple actors/threads trying to call 
> >its put() method--this is the case when there's a relation connecting the 
> >output of one actor to the input ports of multiple actors.  If the size of 
> >the queue is not big enough, two threads may both block on the write and 
> >inform PNDirector that the queue is write blocked.  This leads the 
> >director to believe that two different queues are blocked instead of 
> >one.  Later, when the capacity of the queue is increased or a token is 
> >read and removed from it, the number of write-blocked queues is only 
> >decremented once by the director.  Thus the workflow never terminates 
> >because it always thinks there's a write-blocked queue.
> >
> >Here's a diff against PNQueueReceiver.java in Ptolemy II CVS that seems to 
> >fix the problem:
> >
> >$ diff ~/work/ptII/ptolemy/domains/pn/kernel/PNQueueReceiver.java 
> >PNQueueReceiver.java
> >420,421c420,423
> ><                 _writeBlocked = true;
> ><                 prepareToBlock(branch);
> >---
> > >                 if (false == _writeBlocked) {
> > >                     _writeBlocked = true;
> > >                     prepareToBlock(branch);
> > >                 }
> >
> >
> >Perhaps the get() method also requires something similar.  With this 
> >change, the queue only informs the director that it's blocking on write 
> >once.  It seems to make the workflow I sent earlier work correctly.  If 
> >you agree this patch works, then please apply to ptII CVS.
> >
> >
> >Thanks,
> >Xiaowen
> >
> >
> >
> >On 08.02.2005, at 19:53, xiaowen wrote:
> >
> >>Hi Everyone,
> >>
> >>
> >>Attached please find a small workflow that doesn't appear to finish 
> >>correctly.  It is meant to write a short string to some files and 
> >>immediately finish.  However, it seems to stay in the "executing" state 
> >>and never switches to the "execution finished" state.  Will you please 
> >>take a look and tell me what I'm missing?
> >>
> >>
> >>Thanks!
> >>Xiaowen
> >>
> >>
> >>
> >>
> >><execute.xml>
> >
> >
> >----------------------------------------------------------------------------
> >Posted to the ptolemy-hackers mailing list.  Please send administrative
> >mail for this list to: [EMAIL PROTECTED]
> 
> ------------
> Edward A. Lee
> Professor, Chair of the EE Division, Associate Chair of EECS
> 231 Cory Hall, UC Berkeley, Berkeley, CA 94720
> phone: 510-642-0253 or 510-642-0455, fax: 510-642-2845
> [EMAIL PROTECTED], http://ptolemy.eecs.berkeley.edu/~eal  
> 
> 
> ----------------------------------------------------------------------------
> Posted to the ptolemy-hackers mailing list.  Please send administrative
> mail for this list to: [EMAIL PROTECTED]
<?xml version="1.0" standalone="no"?>
<!DOCTYPE entity PUBLIC "-//UC Berkeley//DTD MoML 1//EN"
    "http://ptolemy.eecs.berkeley.edu/xml/dtd/MoML_1.dtd";>
<entity name="TwoRampBlock" class="ptolemy.actor.TypedCompositeActor">
    <property name="_createdBy" 
class="ptolemy.kernel.attributes.VersionAttribute" value="4.0.1">
    </property>
    <property name="PN Director" class="ptolemy.domains.pn.kernel.PNDirector">
        <property name="_location" class="ptolemy.kernel.util.Location" 
value="[95.0, 50.0]">
        </property>
    </property>
    <property name="_windowProperties" 
class="ptolemy.actor.gui.WindowPropertiesAttribute" value="{bounds={25, 7, 774, 
530}, maximized=false}">
    </property>
    <property name="_vergilSize" class="ptolemy.actor.gui.SizeAttribute" 
value="[559, 403]">
    </property>
    <property name="_vergilZoomFactor" 
class="ptolemy.data.expr.ExpertParameter" value="1.0">
    </property>
    <property name="_vergilCenter" class="ptolemy.data.expr.ExpertParameter" 
value="{279.5, 201.5}">
    </property>
    <entity name="Ramp" class="ptolemy.actor.lib.Ramp">
        <property name="firingCountLimit" class="ptolemy.data.expr.Parameter" 
value="1">
        </property>
        <doc>Create a sequence of tokens with increasing value</doc>
        <property name="_location" class="ptolemy.kernel.util.Location" 
value="[60.0, 165.0]">
        </property>
    </entity>
    <entity name="Ramp2" class="ptolemy.actor.lib.Ramp">
        <property name="firingCountLimit" class="ptolemy.data.expr.Parameter" 
value="1">
        </property>
        <doc>Create a sequence of tokens with increasing value</doc>
        <property name="_location" class="ptolemy.kernel.util.Location" 
value="[445.0, 175.0]">
        </property>
    </entity>
    <entity name="Expression2" class="ptolemy.actor.lib.Expression">
        <property name="expression" class="ptolemy.kernel.util.StringAttribute" 
value="input">
        </property>
        <property name="_icon" class="ptolemy.vergil.icon.BoxedValueIcon">
            <property name="attributeName" 
class="ptolemy.kernel.util.StringAttribute" value="expression">
            </property>
            <property name="displayWidth" class="ptolemy.data.expr.Parameter" 
value="60">
            </property>
        </property>
        <property name="_location" class="ptolemy.kernel.util.Location" 
value="[285.0, 165.0]">
        </property>
        <port name="input" class="ptolemy.actor.TypedIOPort">
            <property name="input"/>
            <property name="_type" class="ptolemy.actor.TypeAttribute" 
value="unknown">
            </property>
        </port>
    </entity>
    <entity name="Expression3" class="ptolemy.actor.lib.Expression">
        <property name="expression" class="ptolemy.kernel.util.StringAttribute" 
value="input">
        </property>
        <property name="_icon" class="ptolemy.vergil.icon.BoxedValueIcon">
            <property name="attributeName" 
class="ptolemy.kernel.util.StringAttribute" value="expression">
            </property>
            <property name="displayWidth" class="ptolemy.data.expr.Parameter" 
value="60">
            </property>
        </property>
        <property name="_location" class="ptolemy.kernel.util.Location" 
value="[280.0, 330.0]">
        </property>
        <port name="input" class="ptolemy.actor.TypedIOPort">
            <property name="input"/>
            <property name="_type" class="ptolemy.actor.TypeAttribute" 
value="unknown">
            </property>
        </port>
    </entity>
    <entity name="Expression4" class="ptolemy.actor.lib.Expression">
        <property name="expression" class="ptolemy.kernel.util.StringAttribute" 
value="input">
        </property>
        <property name="_icon" class="ptolemy.vergil.icon.BoxedValueIcon">
            <property name="attributeName" 
class="ptolemy.kernel.util.StringAttribute" value="expression">
            </property>
            <property name="displayWidth" class="ptolemy.data.expr.Parameter" 
value="60">
            </property>
        </property>
        <property name="_location" class="ptolemy.kernel.util.Location" 
value="[285.0, 270.0]">
        </property>
        <port name="input" class="ptolemy.actor.TypedIOPort">
            <property name="input"/>
            <property name="_type" class="ptolemy.actor.TypeAttribute" 
value="unknown">
            </property>
        </port>
    </entity>
    <entity name="Expression5" class="ptolemy.actor.lib.Expression">
        <property name="expression" class="ptolemy.kernel.util.StringAttribute" 
value="input">
        </property>
        <property name="_icon" class="ptolemy.vergil.icon.BoxedValueIcon">
            <property name="attributeName" 
class="ptolemy.kernel.util.StringAttribute" value="expression">
            </property>
            <property name="displayWidth" class="ptolemy.data.expr.Parameter" 
value="60">
            </property>
        </property>
        <property name="_location" class="ptolemy.kernel.util.Location" 
value="[285.0, 215.0]">
        </property>
        <port name="input" class="ptolemy.actor.TypedIOPort">
            <property name="input"/>
            <property name="_type" class="ptolemy.actor.TypeAttribute" 
value="unknown">
            </property>
        </port>
    </entity>
    <relation name="relation" class="ptolemy.actor.TypedIORelation">
        <vertex name="vertex1" value="[140.0, 165.0]">
        </vertex>
    </relation>
    <relation name="relation3" class="ptolemy.actor.TypedIORelation">
        <vertex name="vertex1" value="[370.0, 165.0]">
        </vertex>
    </relation>
    <link port="Ramp.output" relation="relation"/>
    <link port="Ramp2.trigger" relation="relation3"/>
    <link port="Expression2.output" relation="relation3"/>
    <link port="Expression2.input" relation="relation"/>
    <link port="Expression3.output" relation="relation3"/>
    <link port="Expression3.input" relation="relation"/>
    <link port="Expression4.output" relation="relation3"/>
    <link port="Expression4.input" relation="relation"/>
    <link port="Expression5.output" relation="relation3"/>
    <link port="Expression5.input" relation="relation"/>
</entity>
diff -ur kernel.old/PNDirector.java kernel/PNDirector.java
--- kernel.old/PNDirector.java  2005-03-31 15:49:33.000000000 -0800
+++ kernel/PNDirector.java      2005-03-31 16:23:39.000000000 -0800
@@ -431,12 +431,8 @@
                     + smallestCapacityQueue.getCapacity());
         }
 
-        _actorUnBlocked(smallestCapacityQueue);
-        smallestCapacityQueue.setWritePending(false);
-
-        synchronized (smallestCapacityQueue) {
-            smallestCapacityQueue.notifyAll();
-        }
+        smallestCapacityQueue.setWritePending();
+        this.notifyAll();
 
         return;
     }
@@ -447,14 +443,13 @@
      *  of the process blocking on a read. If either of them is detected,
      *  then notify the directing thread of the same.
      */
-    protected synchronized void _actorBlocked(ProcessReceiver receiver) {
-        if (receiver.isReadBlocked()) {
+    protected synchronized void _actorBlocked(
+        PNQueueReceiver receiver, boolean readBlocked) {
+        if(readBlocked) {
             _readBlockCount++;
-        }
-
-        if (receiver.isWriteBlocked()) {
-            _writeBlockedQueues.add(receiver);
+        } else {
             _writeBlockCount++;
+            _writeBlockedQueues.add(receiver);
         }
 
         super._actorBlocked(receiver);
@@ -465,19 +460,18 @@
      *  the process listeners that the relevant process has resumed its
      *  execution.
      *  @param receiver The receiver that is unblocked.
+     *  @param readBlocked Whether this is read or write blocked.
      */
-    protected synchronized void _actorUnBlocked(PNQueueReceiver receiver) {
-        if (receiver.isReadBlocked()) {
+    protected synchronized void _actorUnBlocked(
+        PNQueueReceiver receiver, boolean readBlocked) {
+        if(readBlocked) {
             _readBlockCount--;
-        }
-
-        if (receiver.isWriteBlocked()) {
+        } else {
             _writeBlockCount--;
             _writeBlockedQueues.remove(receiver);
         }
 
         super._actorUnBlocked(receiver);
-        return;
     }
 
     /** Resolve an artificial deadlock and return true. If the
@@ -555,8 +549,8 @@
 
     ///////////////////////////////////////////////////////////////////
     ////                         private variables                 ////
-    private LinkedList _processListeners = new LinkedList();
 
+    private LinkedList _processListeners = new LinkedList();
     /** The list of all receivers that this director has created. */
     private LinkedList _receivers = new LinkedList();
 }
diff -ur kernel.old/PNQueueReceiver.java kernel/PNQueueReceiver.java
--- kernel.old/PNQueueReceiver.java     2005-03-31 15:49:33.000000000 -0800
+++ kernel/PNQueueReceiver.java 2005-03-31 16:30:31.000000000 -0800
@@ -101,27 +101,6 @@
     ///////////////////////////////////////////////////////////////////
     ////                         public methods                    ////
 
-    /** Prepare to register a block. If the branch object specified as
-     *  a parameter is non-null then register the block with the branch.
-     *  If the branch object specified as a parameter is null then
-     *  register the block with the local director.
-     *  @param branch The Branch managing execution of this method.
-     */
-    public void prepareToBlock(Branch branch) {
-        // NOTE: This used to be synchronized on this object, but
-        // since it calls director methods that are synchronized,
-        // that would cause deadlock.
-        synchronized (_director) {
-            if (branch != null) {
-                branch.registerReceiverBlocked(this);
-                _otherBranch = branch;
-            } else {
-                _director._actorBlocked(this);
-                _otherBranch = branch;
-            }
-        }
-    }
-
     /** Set the container. This overrides the base class to record
      *  the director.
      *  @param port The container.
@@ -156,27 +135,6 @@
         _director = (PNDirector) director;
     }
 
-    /** Unblock this receiver and register this new state with
-     *  either the monitoring branch or the local director. If
-     *  there is no blocked branch waiting, then register the
-     *  new state with the local director; otherwise, register
-     *  the new state with the blocked branch.
-     */
-    public void wakeUpBlockedPartner() {
-        // NOTE: This method used to be synchronized on this
-        // receiver, but since it calls synchronized methods in
-        // the director, that can cause deadlock.
-        synchronized (_director) {
-            if (_otherBranch != null) {
-                _otherBranch.registerReceiverUnBlocked(this);
-            } else {
-                _director._actorUnBlocked(this);
-            }
-
-            _director.notifyAll();
-        }
-    }
-
     /** Get a token from this receiver. If the receiver is empty then
      *  block until a token becomes available. Use the local director
      *  to manage blocking reads that occur. If this receiver is
@@ -202,45 +160,58 @@
      *  receiver. If a process is indeed blocked, then unblock the
      *  process, and inform the director of the same.
      *  Otherwise return.
-     *  @param branch The Branch managin the execution of this method. 
+     *  @param branch The Branch managing the execution of this method.
      *  @return The oldest Token read from the queue
      */
     public Token get(Branch branch) {
-        Workspace workspace = getContainer().workspace();
+
         Token result = null;
 
         // NOTE: This method used to be synchronized on this
         // receiver, but since it calls synchronized methods in
         // the director, that can cause deadlock.
-        synchronized (_director) {
-            while (!_terminate && !super.hasToken()) {
-                _readBlocked = true;
-                prepareToBlock(branch);
-
-                while (_readBlocked && !_terminate) {
-                    try {
-                        workspace.wait(_director);
-                    } catch (InterruptedException e) {
-                        _terminate = true;
-                        break;
-                    }
-                }
+        synchronized(_director) {
+            // OK, I'm blocked on read.
+            if(null == branch) {
+                _director._actorBlocked(this, true);
+                _readNumBlocked++;
+            } else {
+                branch.registerReceiverBlocked(this);
             }
 
-            if (_terminate) {
-                throw new TerminateProcessException("");
-            } else {
-                result = super.get();
+            while(!_terminate) {
+                // Try to read.
+                if(super.hasToken()) {
+                    result = super.get();
+                    if(null == branch) {
+                        if(_readPending) {
+                            _readPending = false;
+                        } else {
+                            _readNumBlocked--;
+                            _director._actorUnBlocked(this, true);
+                        }
+                    } else {
+                        branch.registerReceiverUnBlocked(this);
+                    }
+                    _director.notifyAll();
+                    setWritePending();
+                    break;
+                }
 
-                //Check if pending write to the Queue;
-                if (_writeBlocked) {
-                    wakeUpBlockedPartner();
-                    _writeBlocked = false;
+                // Wait to try again.
+                try {
+                    _director.wait();
+                } catch(InterruptedException e) {
+                    _terminate = true;
                 }
             }
 
-            return result;
+            if(_terminate) {
+                throw new TerminateProcessException("");
+            }
         }
+
+        return result;
     }
 
     /** Return true, since a channel in the Kahn process networks
@@ -379,7 +350,7 @@
         // receiver, but since it is called by synchronized methods in
         // the director, that can cause deadlock.
         synchronized (_director) {
-            return _readBlocked;
+            return 0 < _readNumBlocked;
         }
     }
 
@@ -393,7 +364,7 @@
         // receiver, but since it is called by synchronized methods in
         // the director, that can cause deadlock.
         synchronized (_director) {
-            return _writeBlocked;
+            return 0 < _writeNumBlocked;
         }
     }
 
@@ -427,37 +398,48 @@
      *  @param token The token to be put in the receiver.
      */
     public void put(Token token, Branch branch) {
-        Workspace workspace = getContainer().workspace();
 
         // NOTE: This used to synchronize on this, but since it calls
         // director methods that are synchronized on the director,
         // this can cause deadlock.
-        synchronized (_director) {
-            while (!_terminate && !super.hasRoom()) {
-                _writeBlocked = true;
-                prepareToBlock(branch);
-
-                while (_writeBlocked && !_terminate) {
-                    try {
-                        workspace.wait(_director);
-                    } catch (InterruptedException e) {
-                        _terminate = true;
-                        break;
+        synchronized(_director) {
+            // OK, I'm blocked on write.
+            if(null == branch) {
+                _director._actorBlocked(this, false);
+                _writeNumBlocked++;
+            } else {
+                branch.registerReceiverBlocked(this);
+            }
+
+            while(!_terminate) {
+                // Try to write.
+                if(super.hasRoom()) {
+                    super.put(token);
+                    if(null == branch) {
+                        if(_writePending) {
+                            _writePending = false;
+                        } else {
+                            _writeNumBlocked--;
+                            _director._actorUnBlocked(this, false);
+                        }
+                    } else {
+                        branch.registerReceiverUnBlocked(this);
                     }
+                    _director.notifyAll();
+                    setReadPending();
+                    break;
+                }
+
+                // Wait to try again.
+                try {
+                    _director.wait();
+                } catch(InterruptedException e) {
+                    _terminate = true;
                 }
             }
 
             if (_terminate) {
                 throw new TerminateProcessException("");
-            } else {
-                //token can be put in the queue;
-                super.put(token);
-
-                //Check if pending write to the Queue;
-                if (_readBlocked) {
-                    wakeUpBlockedPartner();
-                    _readBlocked = false;
-                }
             }
         }
     }
@@ -465,8 +447,10 @@
     /** Reset the state variables in the receiver.
      */
     public void reset() {
-        _readBlocked = false;
-        _writeBlocked = false;
+        _readPending = false;
+        _writePending = false;
+        _readNumBlocked = 0;
+        _writeNumBlocked = 0;
         _terminate = false;
         _boundaryDetector.reset();
     }
@@ -476,12 +460,16 @@
      *  @param readPending true if the calling process is blocking on a
      *  read, false otherwise.
      */
-    public void setReadPending(boolean readPending) {
+    public void setReadPending() {
         // NOTE: This method used to be synchronized on this
         // receiver, but since it calls synchronized methods in
         // the director, that can cause deadlock.
         synchronized (_director) {
-            _readBlocked = readPending;
+            if(!_readPending && 0 < _readNumBlocked) {
+                _readPending = true;
+                _readNumBlocked--;
+                _director._actorUnBlocked(this, true);
+            }
         }
     }
 
@@ -490,12 +478,16 @@
      *  @param writePending true if the calling process is blocking on
      *  a write, false otherwise.
      */
-    public void setWritePending(boolean writePending) {
+    public void setWritePending() {
         // NOTE: This method used to be synchronized on this
         // receiver, but since it calls synchronized methods in
         // the director, that can cause deadlock.
         synchronized (_director) {
-            _writeBlocked = writePending;
+            if(!_writePending && 0 < _writeNumBlocked) {
+                _writePending = true;
+                _writeNumBlocked--;
+                _director._actorUnBlocked(this, false);
+            }
         }
     }
 
@@ -518,9 +510,14 @@
 
     /** The director in charge of this receiver. */
     private PNDirector _director;
-    private boolean _readBlocked = false;
-    private boolean _writeBlocked = false;
+
+    private boolean _readPending = false;
+    private boolean _writePending = false;
+    private int _readNumBlocked = 0;
+    private int _writeNumBlocked = 0;
+
     private boolean _terminate = false;
+
     private Branch _otherBranch = null;
     private BoundaryDetector _boundaryDetector;
 }

Reply via email to