I'm worried about cleanly shutting down pipelines of threaded
components. Consider this simple example as a starting point:

#!/usr/bin/env python

import Axon
from Axon.Ipc import producerFinished
from Kamaelia.Chassis.Pipeline import Pipeline

class Producer(Axon.Component.component):
    def main(self):
        for i in range(1000):
            self.send(i, 'outbox')
            yield 1
        self.send(producerFinished(), 'signal')
class Consumer(Axon.Component.component):
    def main(self):
        count = 0
        while 1:
            if self.dataReady('inbox'):
                msg = self.recv('inbox')
                count += 1
            elif self.dataReady('control'):
                msg = self.recv('control')
                if isinstance(msg, producerFinished):
                    break
            else:
                self.pause()
            yield 1
        print "%d messages received" % count
Pipeline(
    Producer(),
    Consumer(),
    ).run()

The producer emits 1000 messages on its outbox, then a shutdown message
on its signal outbox, then terminates. The consumer reads input from its
inbox until it receives a shutdown message on its control inbox, then
terminates, after telling us how many messages it received. Every time I
run this I get 1000 messages, as expected.

Now, let's replace the components with threaded components:

class ProducerT(Axon.ThreadedComponent.threadedcomponent):
    def main(self):
        for i in range(1000):
            self.send(i, 'outbox')
        self.send(producerFinished(), 'signal')
class ConsumerT(Axon.ThreadedComponent.threadedcomponent):
    def main(self):
        count = 0
        while 1:
            if self.dataReady('inbox'):
                msg = self.recv('inbox')
                count += 1
            elif self.dataReady('control'):
                msg = self.recv('control')
                if isinstance(msg, producerFinished):
                    break
            else:
                self.pause(0.1)
        print "%d messages received" % count
Pipeline(
    ProducerT(),
    ConsumerT(),
    ).run()

Now when I run the program I get any number of messages from 0 to 600 or
so. The consumer is receiving the signal message and terminating before
it's received all its data messages. I assume the remainder are sat in
one or both of the components' internal queues.

A simple solution I've found is to add a sync() statement to the producer:

class ProducerT(Axon.ThreadedComponent.threadedcomponent):
    def main(self):
        for i in range(1000):
            self.send(i, 'outbox')
        self.sync()
        self.send(producerFinished(), 'signal')

After doing this I get 1000 messages every time.

Now my questions. Is this a "proper" use of the sync() method? Its
documentation doesn't say anything about flushing internal queues - is
that what it actually does? Is there a better way of doing what I'm
after that's guaranteed to work in all circumstances?
-- 
Jim

--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"kamaelia" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/kamaelia?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to