[warning, this is a little on the long side, and really covers 3 - 4
major topics ]

Hi Gloria,


On Mon, Aug 9, 2010 at 4:32 AM, Gloria W <[email protected]> wrote:
>  Hi Michael et al,
>
> I am pleased to say that I finally have a freelance contract where I can
> make great use of Kamaelia. I am really happy about transitioning my
> personal Kamaelia work from toy to tool. This is exciting. The license
> change helps greatly.

That's really good to hear :-)

> I have a situation where I want to use a Graphline, but the components are
> dynamic. They can become invalid, and new ones can be required, in realtime.
> Am I forced to add them as subcomponents? Is using __setattr__ in this
> capacity considered a kludge?
> How do I add and remove linkages? Should I destroy and recreate the
> Graphline object each time, instead of all of this manipulation? If so, is
> this slow/inefficient?

You've had a few answers to various questions, so I'll try not to
retread the same ground.

As noted, graphline declares linkages statically and wires things up.
This isn't a limitation of Axon but a limitation of graphline - which
is after all intended to be a convenience for managing linkages, but
relatively statically.

Stepping back, a component that takes other components and wires them
into itself is a chassis component. Some chassis components are
obvious:
   * Pipeline
   * Graphline

Some are more powerful since they can handle more interesting
topologies or behaviours, but can be slightly confusing:
   * PAR - Run all subcomponents in parallel, collate outboxes into one outbox
   * Seq - Pipeline(A, Seq(B,C,D), E) is like "A | (B; C; D) | E" in the shell.
   * Carousel - Repeatedly restarts the component mentioned

And some are useful in terms of saying "run this, then this, then
this" or "run all of these":

Then there's chassis type components that manage resources of some kind:
   * Selector
   * DemuxerService
   * Backplane (manages plug splitters)
       - It's well worth looking at backplanes incidentally. I tend to
use them extensively as opposed to creating new chassis style
components.

   * PygameDisplay
   * OpenGLDisplay
   * ServerCore (& hence FastRestartServer & SimpleServer)

Such components tend to want to advertise to the rest of the system that
they're managing services. It was this need to advertise services that led
to the creation of the Co-ordinating assistant tracker. The tracker was
always expected to be created, but was expected to be used for statistics
collection (hence the key/value store). In practice, it was realising that
it can be used more like the unix environment that made a difference.

For example most version control systems will launch $EDITOR rather than a
particular editor.

As you'd expect, as time has gone on we've figured out better ways of
writing these things, and better idioms. I'll try and cover these here.

First of all there's the idea of a basic static chassis. It takes one or
more components and wires it up as a subcomponent. If we assume that it
does not know in advance how many subcomponents it has, it needs to create
inboxes and outboxes dynamically.

It may need to pass data through from it's inbox through to a subcomponent's
inbox efficiently, and likewise pass data outwards efficiently. Once it's
done this, if it's a static chassis, it probably wants to go to sleep and
only wake up when components shutdown in order to unwire them, and shutdown
when all the children have shutdown.

First of all, let's look at how you would re-build the pipeline component.

I'm also posting this next chunk on a pastebin, where colorising make
make it clearer:

    http://pastebin.com/FiuJWapm -- No comments & runnable example usage
    http://pastebin.com/ctgG0vQC -- inline comments

Having identified this lot, the code looks like this - declare the basic
class, and capture config.

class MyPipeline(Axon.Component.component):
    def __init__(self, *comps, **argv):
        super(MyPipeline, self).__init__(**argv)
        if len(comps) < 1:
            raise ValueError("MyPipeline must have at least 1 subcomponent!")
        self.comps = comps

# Then in the main method...
    def main(self):

# Create the linkages _inside_ the chassis:
        middle_components_indices = range(0,len(self.comps[0:-1]))
        for i in middle_components_indices:
            self.link( (self.comps[i],"outbox"), (self.comps[i+1], "inbox") )
            self.link( (self.comps[i],"signal"), (self.comps[i+1], "control") )

# Create linkages that pass data through from the outside world into the
# subcomponents. These need to be marked as "passthrough":

        self.link( (self,"inbox"), (self.comps[0], "inbox"), passthrough=1 )
        self.link( (self,"control"), (self.comps[0], "control"), passthrough=1 )

# Create linkages that pass data through from the subcomponents to the
# outside world . These need to be marked as "passthrough", but outbound:

        self.link( (self.comps[-1], "outbox"), (self,"outbox"), passthrough=2 )
        self.link( (self.comps[-1], "signal"), (self,"signal"), passthrough=2 )

# We now ought to add these components as children and activate them
# Marking a component a child component means that we get woken up
# when the component exits
        self.addChildren(*self.comps)
        for child in self.childComponents():
            child.activate()

# We then want to loop, checking periodically to find out if the child
# components have exitted or not, and if all of them have exitting.

        while True:

    # First of remove exitted child components
    # If none are left, exit the loop

            for child in self.childComponents():
                if child._isStopped(): # Means they've exitted
                    self.removeChild(child)   # deregisters linkages for us

            if len(self.childComponents()) == 0:
                break

    # Since we do nothing otherwise, simply mark us ready to pause and yield.
            self.pause()
            yield 1

    # Since we only exit the loop once all the child components are shutdown,
    # we can simply pass on a producerFinished message. We could skip this if
    # we expected the final component in the pipeline to send on such a message.
    # Which you do is a choice.

        self.send(Axon.Ipc.producerFinished(), "signal")

    # Then just as a cleanup - it shouldn't be needed, but just in case we
    # decide to change our behaviour in future we can also remove the child
    # components (this cleans up a bunch of structures)
        for child in self.childComponents():
            self.removeChild(child)   # deregisters linkages for

Graphline operates in much the same way.



Suppose instead of this we had a component that had the following behaviour:

    * Accepts a stream of data for processing in inbox "inbox"
    * Can accept new "processors" on inbox "processor"
    * A processor is simply a component, and also has a filter condition.
    * Outputs from processors are aggregated (and tagged) on the way
out of the outbox "outbox".

This is a much more complex component.
    * We need to accept new processor requests which entails
        * Reading the processor requests
        * Adding outboxes to send it data
        * Adding inboxes to recieve data from it
        * Tracking the filter function acting as a guard for sending data in
        * Tracking the tag for tagging data on the way out
    * We need to read the primary inbox and based on filter functions
send data on
    * We need to track whether subcomponents (filters) have exitted, and if
      so, unlink them
    * Upon shutdown we need to shutdown any filters which haven't shutdown.

We also need a way of building a test harness for it.

It's a deliberately non-trivial example. Not sure how useful, but it's
interesting.

Since we're adding inboxes & outboxes dynamically, we need to use the
AdaptiveCommsComponent.

As before, this example is here:
     http://pastebin.com/mpcsSKeu - without comments
     http://pastebin.com/HQcDesx4 - with comments

Walking through:

# We declare our class and main inboxes in the usual way, just with the
# new baseclass

class 
TaggingPluggableProcessor(Axon.AdaptiveCommsComponent.AdaptiveCommsComponent):
    Inboxes = {
        "inbox": "Stream of data to process",
        "processor": "Processor requests are sent here",
        "control" : "So we can shutdown"
    }
    def main(self):

# We'll need a lookup mechanism to find data associated with filter components
# or associated with filter inboxes, so we can initialise that here:

        self.lookup = {}

# We will loop until we recieve data on our control inbox, at which point we
# choose to exit the loop
        while not self.dataReady("control"):

# First of all, let's process any pending processor requests:
            for processor_request in self.Inbox("processor"):
                self.add_processor(processor_request)

# Then, process all data that needs filtering:
            for data in self.Inbox("inbox"):
                self.process_messages(data)


# Then, handle any processor components that have exitted. We also need
# to remove them from out lookup tables, so we'll do that here too.
            for child in self.childComponents():
                if child._isStopped(): # Means they've exitted
                    self.remove_processor(child)
                    self.removeChild(child)   # deregisters linkages for us

# Finally, we've cleared "inbox", and "processor". We know "control" is
# currently empty, so any other messages from any other inboxes are messages
# from processor/filter components. That means we have results to send on:
            if self.anyReady():
                # Means that we have data waiting from a processor
                self.send_on_results()

# We can then set the component to pause and yield.
            if not self.anyReady():
                self.pause()
            yield 1

# Once the loop exits, we simply pass on whatever message caused us to
# shutdown

        self.send(self.recv("control"), "signal")

# Similarly, we also need to notify the filter/processor components that
# we've ceased sending them any data. This idiom of reusing an outbox is
# a convenience that tidies up code.
        for c in self.childComponents():
            L = self.link( (self, "_cs"), (c, "control"))
            self.send( Axon.Ipc.producerFinished(), "_cs")
            self.unlink(thelinkage=L)

# Now we've done the core logic of the processor, we can now handle adding
# new procesors.

# First, let's define the method, and unpack the processor request sent to us:
    def add_processor(self, processor_request):
        tag, filter_func, filter_component = processor_request

# We then need to create an inbox/outbox pair that we can use to send
# messages to the processor and recieve results from.

        # Link to component so we can send it data
        to_component =  self.addOutbox("_to_component")
        self.link( (self, to_component), (filter_component, "inbox"))

        # Link from component so we can recieve results
        from_component = self.addInbox("_from_component")
        self.link( (filter_component, "outbox"), (self, from_component))

# We then have a useful collection of data about this processor we'd like
# to track, so let's put that data into a bundle ...

        bundle = { "tag": tag,
                   "filter_func":filter_func,
                   "component": filter_component,
                   "to_component" : to_component,
                   "from_component" : from_component }

# ...and make it so that we
# can retrieve this information either based on the actual component or
# based on the inbox we expect to recieve data on from the component.

        self.lookup[filter_component] = bundle
        self.lookup[from_component] = bundle

# Penultimately, we want to be woken up when this component exits, so we
# mark it a child component...
        self.addChildren(filter_component)

# And finally activate the component
        filter_component.activate()

# Actually processing messages then is somewhat simpler - we simply loop
# through our child components, look up their bundle, use its filter
# function to check the processor should process it and pass it on.

    def process_messages(self, data):
        for child in self.childComponents():
            bundle = self.lookup[child]
            if bundle["filter_func"](data):
                self.send(data, bundle["to_component"])

# Similarly, when sending on results we can take advantage of the fact
# that self.anyReady() returns False if there is no inbox with ready
# data, but returns the inbox name if there is data ready on any inbox.

# This means we can simply repeatedly call anyReady() and if the result
# is not false, empty that inbox passing on values to the main outbox.
# The only addition to this is that we've chosen to tag data on the way out,
# so for that we use the inbox name returned to lookup the data bundle,
# to find the correct tag.
    def send_on_results(self):
        inbox_ready = self.anyReady()
        while inbox_ready:
            bundle = self.lookup[inbox_ready]
            for message in self.Inbox(inbox_ready):
                self.send( (bundle["tag"], message), "outbox")
            inbox_ready = self.anyReady()

# Finally removing a processor is about cleanup. For this we remove
# processors by component, so we use the component to lookup the bundle,
# remove references from the bundle dict, and delete the entries in the
# lookup table.
    def remove_processor(self, child):
        bundle = self.lookup[child]
        from_component = bundle["from_component"]
        del self.lookup[ from_component ]
        del self.lookup[ child ]
        for k in bundle:
            bundle[k] = None

# In order to build a test harness for this, we can use Graphline,
# Pipeline, DataSource, PureTransformer & Seq. In order to ensure that the
# TaggingPluggableProcessor is ready we can introduce a delay into the data
# source to process using Seq & a Pauser component.

from Kamaelia.Util.Console import ConsoleEchoer
from Kamaelia.Util.DataSource import DataSource
from Kamaelia.Util.PureTransformer import PureTransformer
from Kamaelia.Chassis.Pipeline import Pipeline
from Kamaelia.Chassis.Graphline import Graphline
from Kamaelia.Chassis.Seq import Seq

import time
class Pauser(Axon.ThreadedComponent.threadedcomponent):
    def main(self):
        time.sleep(1)

# In particular, this is an effective test harness:

Graphline(
    DATASOURCE = Seq( Pauser(),DataSource([1,2,3,4,5,6])),
    PROCESSOR = TaggingPluggableProcessor(),
    PROCESSORSOURCE = DataSource([
                        ( "EVEN", lambda x: x % 2==0,
PureTransformer(lambda x: "Even ! " + str(x)) ),
                        ( "ODD", lambda x: x % 2==1,
PureTransformer(lambda x: "Odd ! " + str(x)) ),
                        ( "THREE", lambda x: x % 3==0,
PureTransformer(lambda x: "Divisible by 3 ! " + str(x)) ),
                        ( "FOUR", lambda x: x % 4==0,
PureTransformer(lambda x: "Divisible by 3 ! " + str(x)) ),
                      ]),
    CONSOLE = Pipeline( PureTransformer(lambda x: repr(x)+"\n"),
ConsoleEchoer()),
    linkages = {
        ("DATASOURCE","outbox"): ("PROCESSOR", "inbox"),
        ("DATASOURCE","signal"): ("PROCESSOR", "control"),
        ("PROCESSORSOURCE","outbox"): ("PROCESSOR","processor"),
        ("PROCESSOR","outbox"): ("CONSOLE", "inbox"),
        ("PROCESSOR","signal"): ("CONSOLE", "control"),
    }
).run()

Hopefully these 2 examples help somewhat with the idea of chassis components.

Regarding timers BTW, there's lots of ways to do these, and my
preferred way is to use a threaded component, explicitly prodding the
component. Eg:

class Prodder(Axon.ThreadedComponent.threadedcomponent):
    pause = 0.5
    message = "ping"
    def main(self):
        while not self.dataReady("control"):
            time.sleep(self.pause)
            self.send(self.message, "outbox")
        self.send(self.recv("control"), "signal")

Pipeline(Prodder(), ConsoleEchoer()).activate()
Pipeline(Prodder(), ConsoleEchoer(pause = 1)).activate()
Pipeline(Prodder(), ConsoleEchoer(pause = 2, message="pong")).activate()
Pipeline(Prodder(), ConsoleEchoer(message="pong")).run()

However you can combine the ideas of a chassis component with this
as well, so you can add timeouts to systems. My greylisting mail server
does just that like this:

class GreylistServer(ServerCore):
...
    class TCPS(TCPServer):
        CSA = NoActivityTimeout(ConnectedSocketAdapter,
timeout=config["inactivity_timeout"], debug=False)

Where NoActivityTimeout is defined here:
http://code.google.com/p/kamaelia/source/browse/trunk/Code/Python/Kamaelia/Kamaelia/Internet/TimeOutCSA.py#292

Like this:

def NoActivityTimeout(someclass, timeout=2, debug=False):
    """
    This is a factory function that will return a new function object that will
    produce an InactivityChassis with the given timeout and debug values.  The
    values specified in timeout, debug, and someclass will be used in all future
    calls to the returned function object.

    someclass - the class to wrap in an InactivityChassis
    timeout - the amount of time to wait before sending the shutdown signal
    debug - the debugger to use
    """
    def maker(self, *args,**argd):
        X = InactivityChassis(someclass(*args,**argd),
timeout=timeout, debug=debug)
        return X
    return maker

InactivityChassis is then defined like this:
def InactivityChassis(somecomponent, timeout=2, debug=False):
    """
    This convenience function will link a component up to an ActivityMonitor and
    a ResettableSender that will emit a producerFinished signal within timeout
    seconds if the component does not send any messages on its outbox or
    CreatorFeedback boxes.  To link the specified component to an
external component
    simply link it to the returned chassis's outbox or CreatorFeedback outboxes.
    """
    linkages = {
[ snipped in mail for space ]
    }
    return Graphline(
        OBJ=somecomponent,
        ACT=ActivityMonitor(),
        SHUTTERDOWNER=ResettableSender(debug=debug,
message=producerFinished(), timeout=timeout),
        linkages = linkages
    )

ie This has something that WILL send a shutdown message eventually, based
on a timer that counts down. That timer can be reset, meaning that any
activity resets the countdown. As a result an inactive connection gets
shutdown after a certain time period.

Underneath all that though how does resettable sender work?

class ResettableSender(Axon.ThreadedComponent.threadedcomponent):
    """
    This component represents a simple way of making a timed event
occur.  By default,
    it is set up to send a timeout message "NEXT" within 5 seconds.
If it receives
    a message on its inbox, the timer will be reset.  This component will ignore
    any input on its control inbox.
    """
    timeout=5
    message="NEXT"
    debug = False

    Inboxes = {
        "inbox"   : "Send anything here to reset the timeout countdown.",
        "control" : "As usual",
    }

    Outboxes = {
        "outbox" : "'NEXT' message sent when the timeout occurs.",
        "signal" : "As usual",
    }

    def main(self):
        # print "TIMEOUT", repr(self.timeout)
        now = time.time()
        while not self.dataReady("control"):
            time.sleep(1) # Yes, plain old time.sleep
            if self.dataReady("inbox"):
                while self.dataReady("inbox"):
                    self.recv("inbox")
                now = time.time()
            if time.time() - now > self.timeout:
                break
            elif self.debug:
                print "."
        self.send(self.message, "outbox")

        # print "SHUTDOWN", self.name

        if self.dataReady("control"):
            self.send(self.recv("control"), "signal")
        else:
            self.send(self.Axon.Ipc.producerFinished(), "signal")

Incidentally, due to using inheritable defaults, any server can use this
like this:

class MyServer(ServerCore):
    logfile = config["greylist_log"]
    debuglogfile = config["greylist_debuglog"]
    socketOptions=(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    class TCPS(TCPServer):
        CSA = NoActivityTimeout(ConnectedSocketAdapter,
timeout=config["inactivity_timeout"], debug=False)

MyServer(protocol=..., port= ...).run()

Finally, I've been doing a fair amount of time related work using kamaelia
recently, and I'm very much of the opinion that using time.sleep inside a
threaded component is by far and away the most simple thing to do.

Hope that all helps,

> Many thank yous, and I'm really happy to be back here.

You're welcome, and good to hear from you!

Regards,


Michael.

-- 
You received this message because you are subscribed to the Google Groups 
"kamaelia" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/kamaelia?hl=en.

Reply via email to