Hi Jim,

On Monday 05 October 2009 12:45:47 Jim Easterbrook wrote:
> I'm worried about cleanly shutting down pipelines of threaded
> components. 
> [snip]

OK, I've read this, copied the generator version into /Sketches/MPS/List/JE.py 
and run that, and as expected get the same result.

    ~/Checkouts/kamaelia/trunk/Sketches/MPS/List> ./JE.py
    1000 messages received

Similarly I've placed the threaded version in JE_T.py. Running that 10 times 
results in this:

    ~/Checkouts/kamaelia/trunk/Sketches/MPS/List> for i in `seq 10`;
      do ./JE_T.py; done
    213 messages received
    696 messages received
    676 messages received
    205 messages received
    232 messages received
    870 messages received
    952 messages received
    1000 messages received
    646 messages received
    1000 messages received

Which is also the sort of behaviour you describe.

Adding the self.sync() call does this:

    ~/Checkouts/kamaelia/trunk/Sketches/MPS/List> for i in `seq 10`;
     do ./JE_TS.py; done
    1000 messages received
    1000 messages received
    1000 messages received
    1000 messages received
    1000 messages received
    1000 messages received
    1000 messages received
    1000 messages received
    1000 messages received
    1000 messages received

The reason the latter works as you expect, but the former doesn't is tricky to 
explain, and I'm not actually sure if this is a bug we can do something about 
directly at this stage, or if it's something we have to chalk up as a race 
hazard.

I think it can be chalked up as a race hazard for which we can create a 
(better) work around.

I'll try and summarise what's happening.

For safety reasons, threadedcomponent was implemented as simply as possible 
originally. Specifically, the following happens:
    * Someone creates a threadedcomponent - C
    * They call .activate() on it. This does 2 relevant things:
        - C.main() is activated inside a thread
        - C._localmain() is activated as a Axon.Microprocess.microprocess as a
           normal microprocess.

Essentially, when you activate a threaded component, 2 pieces of code are 
activated - the component you're running, and essentially a local postman.
   * This shadow postman runs in the same thread as the scheduler.
   * The thread runs inside it's own thread.

Communication between this shadow postman in the scheduler's thread and the 
thread containing the component logic is via Queue.Queue objects - inqueues 
and outqueues.

So when you send messages between 2 threaded components, the following occurs:

In the sender:
        * self.send(something, "outbox")
        - something is added to a Queue.Queue called self.outqueues[boxname]
        - The _localmain / local postman is woken up
    * _localmain/postman:
        - loops through self.outqueues[boxname] taking any values it finds,
          and delivers those into self.outboxes[boxname]

In the recipient:
    * _localmain/postman:
        - loops through self.inboxes[boxname] taking any values it finds,
          and delivers those into self.inqueues[boxname]

Summarising that, the following happens in the following threads:
    a Add to outqueue [ in producer thread]
    b move from outqueue to outbox [ in scheduler thread ]
    c move from inbox to inqueue [ in scheduler thread ]
    d Collect from inqueue [ in consumer thread ]

Now, the interesting step here is "c"
    * Step c has an interesting issue. The way "c" is implemented is as
       follows:
          for box in self.inboxes:
              # copy data from inboxes[box] to inqueue[box]

If we edit this  (inside threadedcomponent._localmain) slightly to:
          for box in self.inboxes:
              print box
              # copy data from inboxes[box] to inqueue[box]

Then something jumps out:
    ~/Checkouts/kamaelia/trunk/Sketches/MPS/List> ./JE_T.py
    control
    inbox
    control
    inbox

In essence, this means that in some circumstance data can be pushed into the 
consumer thread faster via "control" than via "inbox". If the consumer thread 
empties the inqueue, then it can then collect from control, and cause the 
thread to exit "early".

Now, what self.sync() is doing in this scenario essentially is actually 
essentially adding in a delay. We can find out how long that is:

    ~/Checkouts/kamaelia/trunk/Sketches/MPS/List> diff -u JE_TS.py JE_TSt.py
    --- JE_TS.py    2009-10-06 23:54:21.000000000 +0100
    +++ JE_TSt.py   2009-10-07 00:48:02.000000000 +0100
    @@ -4,13 +4,17 @@
     from Axon.Ipc import producerFinished
     from Kamaelia.Chassis.Pipeline import Pipeline
     import sys
    +import time

     class ProducerT(Axon.ThreadedComponent.threadedcomponent):
         def main(self):
             for i in range(1000):
                 self.send(i, 'outbox')
    +        t = time.time()
             self.sync()
             self.send(producerFinished(), 'signal')
    +        t2 = time.time()
    +        print "D", t2-t

     class ConsumerT(Axon.ThreadedComponent.threadedcomponent):
         def main(self):

~/Checkouts/kamaelia/trunk/Sketches/MPS/List> for i in `seq 10`; 
do ./JE_TSt.py; done
D 0.0736348628998
1000 messages received
D 0.0742328166962
1000 messages received
D 0.0791389942169
1000 messages received
D 0.228218078613
1000 messages received
D 0.0888390541077
1000 messages received
D 0.259760141373
1000 messages received
D 0.0799508094788
1000 messages received
D 0.0782840251923
1000 messages received
D 0.0755040645599
1000 messages received
D 0.0769910812378
1000 messages received

Some of those delays are quite substantial, and it should be clear at this 
point that this is a pure race hazard. The question arises, can something be 
done about this?

Well, I think there can, and there's a few of options really:
    1 Change the order in which box deliveries happen. This is fragile, but
       will work in this case, probably.
    2 Insert an extra component that adds in a delay to the signal/control
      path. Again, not ideal.
    3 Change delivery of data for thread's queue.queues to use direct delivery
       in the same way Axon.Boxes do. This is somewhat harder, but potentially
       much better. (Has the potential to simplify ThreadedComponent's logic
       substantially)
    4 Change the shadow postman/localmain such that it checks control first.
       If it finds anything that *isn't* shutdownMicroprocess, it delivers all
       other boxes first. If it finds shutdownMicroprocess, it delivers that
       first.

I think the first is the simplest pragmatic short term solution, but 
non-ideal. The second from last of these is probably the "best" solution, and 
opens up some interesting possibilities in other cases. The last option is 
probably overall the most pragmatic.

I'm not sure really. The second option _would_ allow you to control this 
directly yourself.

Finally, it might be worth mentioning *why* the threaded component is written 
this way. Essentially it's because it was the simplest most direct thing that 
could be thought of at the time. Specifically there was a need for something 
which was a component that could sit inside axon, and adapt from the 
generator world to a thread. That implies something acting much like a 
postman, and copying messages from inboxes to threadsafe queues, and copying 
messages from threadsafe queues to outboxes.

As a result you end up with the approach taken.

The sync() method actually has nothing to do with /bin/sync in unix, and in 
retrospect, that name is not perfect. What the sync() method actually does is 
this:
    * It sends a message over a private threadsafe queue - threadedtoaxon -
      which says call this in your thread. ie it's one way for a thread to ask
      another thread to call something.
   * It also essentially asks the other end to do this synchronously. 
   * Crucially this blocks until the scheduler thread has a chance to grab the
     object from the queue. This is done *after* the box deliveries are dealt
     with. (Which is why a "blind" time.sleep() doesn't guarantee delivery,
     which I'm guessing you tried)

The primary usage for sync is inside linkage _creation_ as the docs inside the 
module describe.

It might be a good idea to allow a means to say "wait until outbox flushed" - 
to give something similar to /bin/sync before exitting. This should be fairly 
simple to implement cleanly. If this is interesting/useful, please let me 
know. It'd give you another way to gain some determinism in your code!

I'd prefer to actually implement "3" above, but I can't guarantee a timeframe 
for that - I'd rather implement that as a mini-axon first, and then build 
outwards.

Hoping this is useful,


Michael.
-- 
http://yeoldeclue.com/blog
http://twitter.com/kamaelian
http://www.kamaelia.org/Home

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"kamaelia" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/kamaelia?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to