Hi Jim,
On Monday 05 October 2009 12:45:47 Jim Easterbrook wrote:
> I'm worried about cleanly shutting down pipelines of threaded
> components.
> [snip]
OK, I've read this, copied the generator version into /Sketches/MPS/List/JE.py
and run that, and as expected get the same result.
~/Checkouts/kamaelia/trunk/Sketches/MPS/List> ./JE.py
1000 messages received
Similarly I've placed the threaded version in JE_T.py. Running that 10 times
results in this:
~/Checkouts/kamaelia/trunk/Sketches/MPS/List> for i in `seq 10`;
do ./JE_T.py; done
213 messages received
696 messages received
676 messages received
205 messages received
232 messages received
870 messages received
952 messages received
1000 messages received
646 messages received
1000 messages received
Which is also the sort of behaviour you describe.
Adding the self.sync() call does this:
~/Checkouts/kamaelia/trunk/Sketches/MPS/List> for i in `seq 10`;
do ./JE_TS.py; done
1000 messages received
1000 messages received
1000 messages received
1000 messages received
1000 messages received
1000 messages received
1000 messages received
1000 messages received
1000 messages received
1000 messages received
The reason the latter works as you expect, but the former doesn't is tricky to
explain, and I'm not actually sure if this is a bug we can do something about
directly at this stage, or if it's something we have to chalk up as a race
hazard.
I think it can be chalked up as a race hazard for which we can create a
(better) work around.
I'll try and summarise what's happening.
For safety reasons, threadedcomponent was implemented as simply as possible
originally. Specifically, the following happens:
* Someone creates a threadedcomponent - C
* They call .activate() on it. This does 2 relevant things:
- C.main() is activated inside a thread
- C._localmain() is activated as a Axon.Microprocess.microprocess as a
normal microprocess.
Essentially, when you activate a threaded component, 2 pieces of code are
activated - the component you're running, and essentially a local postman.
* This shadow postman runs in the same thread as the scheduler.
* The thread runs inside it's own thread.
Communication between this shadow postman in the scheduler's thread and the
thread containing the component logic is via Queue.Queue objects - inqueues
and outqueues.
So when you send messages between 2 threaded components, the following occurs:
In the sender:
* self.send(something, "outbox")
- something is added to a Queue.Queue called self.outqueues[boxname]
- The _localmain / local postman is woken up
* _localmain/postman:
- loops through self.outqueues[boxname] taking any values it finds,
and delivers those into self.outboxes[boxname]
In the recipient:
* _localmain/postman:
- loops through self.inboxes[boxname] taking any values it finds,
and delivers those into self.inqueues[boxname]
Summarising that, the following happens in the following threads:
a Add to outqueue [ in producer thread]
b move from outqueue to outbox [ in scheduler thread ]
c move from inbox to inqueue [ in scheduler thread ]
d Collect from inqueue [ in consumer thread ]
Now, the interesting step here is "c"
* Step c has an interesting issue. The way "c" is implemented is as
follows:
for box in self.inboxes:
# copy data from inboxes[box] to inqueue[box]
If we edit this (inside threadedcomponent._localmain) slightly to:
for box in self.inboxes:
print box
# copy data from inboxes[box] to inqueue[box]
Then something jumps out:
~/Checkouts/kamaelia/trunk/Sketches/MPS/List> ./JE_T.py
control
inbox
control
inbox
In essence, this means that in some circumstance data can be pushed into the
consumer thread faster via "control" than via "inbox". If the consumer thread
empties the inqueue, then it can then collect from control, and cause the
thread to exit "early".
Now, what self.sync() is doing in this scenario essentially is actually
essentially adding in a delay. We can find out how long that is:
~/Checkouts/kamaelia/trunk/Sketches/MPS/List> diff -u JE_TS.py JE_TSt.py
--- JE_TS.py 2009-10-06 23:54:21.000000000 +0100
+++ JE_TSt.py 2009-10-07 00:48:02.000000000 +0100
@@ -4,13 +4,17 @@
from Axon.Ipc import producerFinished
from Kamaelia.Chassis.Pipeline import Pipeline
import sys
+import time
class ProducerT(Axon.ThreadedComponent.threadedcomponent):
def main(self):
for i in range(1000):
self.send(i, 'outbox')
+ t = time.time()
self.sync()
self.send(producerFinished(), 'signal')
+ t2 = time.time()
+ print "D", t2-t
class ConsumerT(Axon.ThreadedComponent.threadedcomponent):
def main(self):
~/Checkouts/kamaelia/trunk/Sketches/MPS/List> for i in `seq 10`;
do ./JE_TSt.py; done
D 0.0736348628998
1000 messages received
D 0.0742328166962
1000 messages received
D 0.0791389942169
1000 messages received
D 0.228218078613
1000 messages received
D 0.0888390541077
1000 messages received
D 0.259760141373
1000 messages received
D 0.0799508094788
1000 messages received
D 0.0782840251923
1000 messages received
D 0.0755040645599
1000 messages received
D 0.0769910812378
1000 messages received
Some of those delays are quite substantial, and it should be clear at this
point that this is a pure race hazard. The question arises, can something be
done about this?
Well, I think there can, and there's a few of options really:
1 Change the order in which box deliveries happen. This is fragile, but
will work in this case, probably.
2 Insert an extra component that adds in a delay to the signal/control
path. Again, not ideal.
3 Change delivery of data for thread's queue.queues to use direct delivery
in the same way Axon.Boxes do. This is somewhat harder, but potentially
much better. (Has the potential to simplify ThreadedComponent's logic
substantially)
4 Change the shadow postman/localmain such that it checks control first.
If it finds anything that *isn't* shutdownMicroprocess, it delivers all
other boxes first. If it finds shutdownMicroprocess, it delivers that
first.
I think the first is the simplest pragmatic short term solution, but
non-ideal. The second from last of these is probably the "best" solution, and
opens up some interesting possibilities in other cases. The last option is
probably overall the most pragmatic.
I'm not sure really. The second option _would_ allow you to control this
directly yourself.
Finally, it might be worth mentioning *why* the threaded component is written
this way. Essentially it's because it was the simplest most direct thing that
could be thought of at the time. Specifically there was a need for something
which was a component that could sit inside axon, and adapt from the
generator world to a thread. That implies something acting much like a
postman, and copying messages from inboxes to threadsafe queues, and copying
messages from threadsafe queues to outboxes.
As a result you end up with the approach taken.
The sync() method actually has nothing to do with /bin/sync in unix, and in
retrospect, that name is not perfect. What the sync() method actually does is
this:
* It sends a message over a private threadsafe queue - threadedtoaxon -
which says call this in your thread. ie it's one way for a thread to ask
another thread to call something.
* It also essentially asks the other end to do this synchronously.
* Crucially this blocks until the scheduler thread has a chance to grab the
object from the queue. This is done *after* the box deliveries are dealt
with. (Which is why a "blind" time.sleep() doesn't guarantee delivery,
which I'm guessing you tried)
The primary usage for sync is inside linkage _creation_ as the docs inside the
module describe.
It might be a good idea to allow a means to say "wait until outbox flushed" -
to give something similar to /bin/sync before exitting. This should be fairly
simple to implement cleanly. If this is interesting/useful, please let me
know. It'd give you another way to gain some determinism in your code!
I'd prefer to actually implement "3" above, but I can't guarantee a timeframe
for that - I'd rather implement that as a mini-axon first, and then build
outwards.
Hoping this is useful,
Michael.
--
http://yeoldeclue.com/blog
http://twitter.com/kamaelian
http://www.kamaelia.org/Home
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"kamaelia" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/kamaelia?hl=en
-~----------~----~----~----~------~----~------~--~---