My hurried two cents:

Ignoring threaded component behaviour for a moment. Axon delivers messages
to the destination component immediately. So if you send a message(s) to
the "inbox" inbox of a component, then a message to the "control" inbox of
the same component; then if that component notices the message in its
"control" inbox, then it is guaranteed that the messages sent, earlier, to
its "inbox" inbox will already be there. Ie. (in the current
implementation at least) message delivery is guaranteed in-order.

Threaded components are more interesting, because of the internal inqueues
and outqueues you've identified.

Deliveries to a given single inbox will be in-order. But I don't think you
can guarantee delivery is in order when considering, as a whole, a
sequence of messages sent to two or more inboxes.

> A simple solution I've found is to add a sync() statement to the producer:
>
> class ProducerT(Axon.ThreadedComponent.threadedcomponent):
>     def main(self):
>         for i in range(1000):
>             self.send(i, 'outbox')
>         self.sync()
>         self.send(producerFinished(), 'signal')
>
> After doing this I get 1000 messages every time.
>
> Now my questions. Is this a "proper" use of the sync() method? Its
> documentation doesn't say anything about flushing internal queues - is
> that what it actually does? Is there a better way of doing what I'm
> after that's guaranteed to work in all circumstances?

This is good as far as I'm concerned :-) The sync, behaviourally, achieves
a very similar effect to shoving a yield statement in a normal component. 
- it ensures that other components get a slice of execution time before
the producerFinished() signal is sent. Tho it might not always be
necessary (see below).

It does cause queues to be flushed. And the documentation (I wrote) is
poor in that it does not explicitly state what actually happens (or what
is guaranteed from the perspective of a component writer). Yes, its
purpose when I coded it was to ensure queues are flushed.

The sync() call essentially does a synchronous RPC from the thread back to
the main thread of execution, so yes, in principle all messages pending at
inboxes ought to be pushed into the inqueues going to the thread.

However, even inserting self.sync() calls doesn't eliminate all race
conditions: a fresh message could arrive at the "control" inbox just after
sync() returns, and be delivered first (before any other messages pending
at the "inbox" inbox) meaning the thread might still miss them. Hmm.

I guess something like this might be a hacky inelegant work around (not
tested):

class MyConsumer(Axon.ThreadedComponent.threadedcomponent):
    def main(self):
        self.count=0
        while 1:
            dealWithInbox()
            if self.dataReady("control"):
                msg = self.recv('control')
                if isinstance(msg, producerFinished):
                    shutdownMsg = msg
                    break
            else:
                self.pause()
            yield 1

        self.sync() # force final deliveries
        dealWithInbox() # clear out any leftovers
        self.send(shutdownMsg, "signal")

    def dealWithInbox(self):
        msg = self.recv('inbox')
        self.count += 1




Matt
-- 
| Matt Hammond
|
| [anything you like unless it bounces] 'at' matthammond 'dot' org




--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"kamaelia" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/kamaelia?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to