On 10/11/2009 14:36, Jim Easterbrook wrote:
> Is it possible to subclass Backplane() in order to create a backplane
> that knows when a subscriber is added or removed?
> 
> I have a CPU intensive process creating stuff that is published to a
> backplane, and I'd like to suspend processing when there aren't any
> subscribers, to minimise load on the system. My first thought was to
> subclass Backplane(), but I'm not getting anywhere with that.

Just to "close the loop", here's my current solution. It's not very
elegant, but it works. (And it'll probably need to be rewritten if the
implementation of Backplane() changes.)

class DynamicPublisher(Axon.ThreadedComponent.threadedcomponent):
    """Monitor number of subscribers to a backplane. When it rises
    above 0, the pipeline is started. When it falls to 0 again, the
    pipeline is stopped. The pipeline can also be started by sending
    anything to the component's inbox.
    """
    Inboxes = { "inbox"   : "Any input starts the pipeline",
                "control" : "Receives control messages",
              }
    Outboxes = { "outbox" : "NOT USED",
                 "signal" : "Forwards control messages",
                 "_stop"  : "Used to halt child component, internal",
               }
    def __init__(self, name, **argd):
        super(DynamicPublisher, self).__init__(**argd)
        self.name = name
    def main(self):
        # get backplane's Splitter() component
        cat = CAT.getcat()
        splitter, configbox = cat.retrieveService(
            "Backplane_O_"+self.name)
        self.pipe = None
        while 1:
            # wake up in ten seconds, or when message received
            self.pause(10)
            if self.dataReady('control'):
                # get a control message
                msg = self.recv('control')
                if isinstance(msg,
                              (shutdownMicroprocess, producerFinished)):
                    self.send(msg, 'signal')
                    break
            if self.dataReady('inbox'):
                msg = self.recv('inbox')
                self.startPipeLine()
            else:
                # check current subscriber numbers
                subscribers = len(splitter.outboxsinks)
                if subscribers > 0:
                    self.startPipeLine()
                else:
                    self.stopPipeLine()
        self.stopPipeLine()
    def startPipeLine(self):
        if self.pipe:
            return
        self.pipe = Pipeline(
            CPU_intensive_process1(),
            CPU_intensive_process2(),
            CPU_intensive_process3(),
            PublishTo(self.name),
            ).activate()
    def stopPipeLine(self):
        if not self.pipe:
            return
        self.link((self, '_stop'), (self.pipe, 'control'))
        self.send(shutdownMicroprocess(), '_stop')
        self.unlink(self.pipe)
        self.pipe = None

Whenever I create something that subscribes to the backplane, I send a
message to the DynamicPublisher() inbox. This starts the CPU intensive
process, if it isn't already running.

A few seconds after the last subscriber disconnects, the CPU intensive
processes are shut down, freeing up system resources (or reducing CPU
power consumption).
-- 
Jim

--

You received this message because you are subscribed to the Google Groups 
"kamaelia" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/kamaelia?hl=en.


Reply via email to