On Thursday 08 October 2009 14:28:58 Jim Easterbrook wrote:
> On 08/10/2009 13:45, Matt Hammond wrote:
> > Ignoring threaded component behaviour for a moment. Axon delivers
> > messages to the destination component immediately. So if you send a
> > message(s) to the "inbox" inbox of a component, then a message to the
> > "control" inbox of the same component; then if that component notices the
> > message in its "control" inbox, then it is guaranteed that the messages
> > sent, earlier, to its "inbox" inbox will already be there. Ie. (in the
> > current
> > implementation at least) message delivery is guaranteed in-order.
>
> Good. I, for one, would like this to be made part of the "specification"
> so future implementations preserve this aspect.
It's guaranteed that inboxes and outboxes are FIFO queues, and guaranteed to
have in order delivery. You could define a special outbox which didn't
guarantee delivery or order - ala UDP - but that would be more sensible to
implement as a component so someone knew explicitly what they're getting
into.
> > It does cause queues to be flushed. And the documentation (I wrote) is
> > poor in that it does not explicitly state what actually happens (or what
> > is guaranteed from the perspective of a component writer). Yes, its
> > purpose when I coded it was to ensure queues are flushed.
>
> That's reassuring. I'd sort of worked out from the code what it was up
> to, but it's nice to have confirmation from the author.
It's the maintainer you need to worry about. Given I'm working on redoing the
scheduler, there wasn't any guarantee this trick would work in future. I'll
add a self.flushboxes() method to threadedcomponent that (for the moment)
calls sync().
Whilst Matth says the purpose of sync was to flush the queues, the code
doesn't really agree - it looks more like an accident of implementation.
That's not to say that wasn't the intent, but you'd be hard pressed to take
away that interpretation from the code.
cf:
Regulating speed
----------------
In addition to being able to pause (with an optional timeout), a threaded
component can also regulate its speed by briefly synchronising with the
rest of the system. Calling the sync() method simply briefly blocks until
the rest of the system can acknowledge.
...
def sync(self):
"""\
Call this from main() to synchronise with the main scheduler's thread.
You may wish to do this to throttle your component's behaviour
This is akin to posix.sched_yield or shoving extra "yield"
statements into a component's generator.
"""
return self._do_threadsafe( lambda:None, [], {} )
Which means that the anonymous function "lambda: None" (called here "anon" for
sake of discussion) is called inside the scheduler.
_do_threadsafe's purpose is essentially described as follows:
For other methods such as link(), unlink() and (in the case of
threadedadaptivecommscomponent) addInbox(), deleteInbox(),
addOutbox() and deleteOutbox(), the _localmain() microprocess
also acts on the thread's behalf.
...
This is implemented by the _do_threadsafe() method.
Crucially, in order for this to flush the queues, it has to work on the
assumption that this can and will cause _localmain to have a full cycle
before returning. This relies on a side effect of the way the scheduler is
implemented today - which if left relied upon makes the system brittle.
> > However, even inserting self.sync() calls doesn't eliminate all race
> > conditions: a fresh message could arrive at the "control" inbox just
> > after sync() returns, and be delivered first (before any other messages
> > pending at the "inbox" inbox) meaning the thread might still miss them.
> > Hmm.
>
> I'd not given much thought to inboxes when I wrote my original, being
> more concerned with the producer's outboxes. However, testing my
> threaded consumer with the unthreaded producer revealed that inboxes are
> also a problem.
That's because as I noted the issue is in the fact that movement of data from
inboxes to inqueues results in data from the control inbox being passed over
by the localmain first.
Thinking about it a flush() method in both threaded/non-threaded components
would solve this, as would changing the message delivery system. The latter
would would be backwards compatible with existing components though, and make
the system work essentially the way you want it to. (ie without making
components more complex and without needing to alter existing logic).
I'll implement the latter.
> Here's my solution (tested) which I think achieves the same thing:
>
> class ConsumerT(Axon.ThreadedComponent.threadedcomponent):
> def main(self):
> count = 0
> while 1:
> if self.dataReady('inbox'):
> msg = self.recv('inbox')
> count += 1
> elif self.dataReady('control'):
> self.sync()
> if not self.dataReady('inbox'):
> msg = self.recv('control')
> if isinstance(msg, producerFinished):
> break
> else:
> self.pause(0.1)
> print "%d messages received" % count
That would work, but from a performance perspective, it's a huge hit on the
system.
> This just does a sync() any time there is data in 'control', and defers
> dealing with the control input until there is no inbox input.
>
> Note that in neither case am I considering inbox messages sent after the
> control message. My components do not send anything after a control
> message.
I think whilst the above works, making Axon work the way you expected it to in
the first place is probably more beneficial.
It's a bug in Axon IMO, thanks for discussing it here :-)
We can also make a feature (sync() -> flush() ) clearer, but that's a fringe
benefit IMO.
Michael.
--
http://yeoldeclue.com/blog
http://twitter.com/kamaelian
http://www.kamaelia.org/Home
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"kamaelia" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/kamaelia?hl=en
-~----------~----~----~----~------~----~------~--~---