I'm worried about cleanly shutting down pipelines of threaded
components. Consider this simple example as a starting point:
#!/usr/bin/env python
import Axon
from Axon.Ipc import producerFinished
from Kamaelia.Chassis.Pipeline import Pipeline
class Producer(Axon.Component.component):
def main(self):
for i in range(1000):
self.send(i, 'outbox')
yield 1
self.send(producerFinished(), 'signal')
class Consumer(Axon.Component.component):
def main(self):
count = 0
while 1:
if self.dataReady('inbox'):
msg = self.recv('inbox')
count += 1
elif self.dataReady('control'):
msg = self.recv('control')
if isinstance(msg, producerFinished):
break
else:
self.pause()
yield 1
print "%d messages received" % count
Pipeline(
Producer(),
Consumer(),
).run()
The producer emits 1000 messages on its outbox, then a shutdown message
on its signal outbox, then terminates. The consumer reads input from its
inbox until it receives a shutdown message on its control inbox, then
terminates, after telling us how many messages it received. Every time I
run this I get 1000 messages, as expected.
Now, let's replace the components with threaded components:
class ProducerT(Axon.ThreadedComponent.threadedcomponent):
def main(self):
for i in range(1000):
self.send(i, 'outbox')
self.send(producerFinished(), 'signal')
class ConsumerT(Axon.ThreadedComponent.threadedcomponent):
def main(self):
count = 0
while 1:
if self.dataReady('inbox'):
msg = self.recv('inbox')
count += 1
elif self.dataReady('control'):
msg = self.recv('control')
if isinstance(msg, producerFinished):
break
else:
self.pause(0.1)
print "%d messages received" % count
Pipeline(
ProducerT(),
ConsumerT(),
).run()
Now when I run the program I get any number of messages from 0 to 600 or
so. The consumer is receiving the signal message and terminating before
it's received all its data messages. I assume the remainder are sat in
one or both of the components' internal queues.
A simple solution I've found is to add a sync() statement to the producer:
class ProducerT(Axon.ThreadedComponent.threadedcomponent):
def main(self):
for i in range(1000):
self.send(i, 'outbox')
self.sync()
self.send(producerFinished(), 'signal')
After doing this I get 1000 messages every time.
Now my questions. Is this a "proper" use of the sync() method? Its
documentation doesn't say anything about flushing internal queues - is
that what it actually does? Is there a better way of doing what I'm
after that's guaranteed to work in all circumstances?
--
Jim
--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups
"kamaelia" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to
[email protected]
For more options, visit this group at
http://groups.google.com/group/kamaelia?hl=en
-~----------~----~----~----~------~----~------~--~---