On 10/11/2009 14:36, Jim Easterbrook wrote:
> Is it possible to subclass Backplane() in order to create a backplane
> that knows when a subscriber is added or removed?
>
> I have a CPU intensive process creating stuff that is published to a
> backplane, and I'd like to suspend processing when there aren't any
> subscribers, to minimise load on the system. My first thought was to
> subclass Backplane(), but I'm not getting anywhere with that.
Just to "close the loop", here's my current solution. It's not very
elegant, but it works. (And it'll probably need to be rewritten if the
implementation of Backplane() changes.)
class DynamicPublisher(Axon.ThreadedComponent.threadedcomponent):
"""Monitor number of subscribers to a backplane. When it rises
above 0, the pipeline is started. When it falls to 0 again, the
pipeline is stopped. The pipeline can also be started by sending
anything to the component's inbox.
"""
Inboxes = { "inbox" : "Any input starts the pipeline",
"control" : "Receives control messages",
}
Outboxes = { "outbox" : "NOT USED",
"signal" : "Forwards control messages",
"_stop" : "Used to halt child component, internal",
}
def __init__(self, name, **argd):
super(DynamicPublisher, self).__init__(**argd)
self.name = name
def main(self):
# get backplane's Splitter() component
cat = CAT.getcat()
splitter, configbox = cat.retrieveService(
"Backplane_O_"+self.name)
self.pipe = None
while 1:
# wake up in ten seconds, or when message received
self.pause(10)
if self.dataReady('control'):
# get a control message
msg = self.recv('control')
if isinstance(msg,
(shutdownMicroprocess, producerFinished)):
self.send(msg, 'signal')
break
if self.dataReady('inbox'):
msg = self.recv('inbox')
self.startPipeLine()
else:
# check current subscriber numbers
subscribers = len(splitter.outboxsinks)
if subscribers > 0:
self.startPipeLine()
else:
self.stopPipeLine()
self.stopPipeLine()
def startPipeLine(self):
if self.pipe:
return
self.pipe = Pipeline(
CPU_intensive_process1(),
CPU_intensive_process2(),
CPU_intensive_process3(),
PublishTo(self.name),
).activate()
def stopPipeLine(self):
if not self.pipe:
return
self.link((self, '_stop'), (self.pipe, 'control'))
self.send(shutdownMicroprocess(), '_stop')
self.unlink(self.pipe)
self.pipe = None
Whenever I create something that subscribes to the backplane, I send a
message to the DynamicPublisher() inbox. This starts the CPU intensive
process, if it isn't already running.
A few seconds after the last subscriber disconnects, the CPU intensive
processes are shut down, freeing up system resources (or reducing CPU
power consumption).
--
Jim
--
You received this message because you are subscribed to the Google Groups
"kamaelia" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to
[email protected].
For more options, visit this group at
http://groups.google.com/group/kamaelia?hl=en.