[warning, this is a little on the long side, and really covers 3 - 4 major topics ]
Hi Gloria, On Mon, Aug 9, 2010 at 4:32 AM, Gloria W <[email protected]> wrote: > Hi Michael et al, > > I am pleased to say that I finally have a freelance contract where I can > make great use of Kamaelia. I am really happy about transitioning my > personal Kamaelia work from toy to tool. This is exciting. The license > change helps greatly. That's really good to hear :-) > I have a situation where I want to use a Graphline, but the components are > dynamic. They can become invalid, and new ones can be required, in realtime. > Am I forced to add them as subcomponents? Is using __setattr__ in this > capacity considered a kludge? > How do I add and remove linkages? Should I destroy and recreate the > Graphline object each time, instead of all of this manipulation? If so, is > this slow/inefficient? You've had a few answers to various questions, so I'll try not to retread the same ground. As noted, graphline declares linkages statically and wires things up. This isn't a limitation of Axon but a limitation of graphline - which is after all intended to be a convenience for managing linkages, but relatively statically. Stepping back, a component that takes other components and wires them into itself is a chassis component. Some chassis components are obvious: * Pipeline * Graphline Some are more powerful since they can handle more interesting topologies or behaviours, but can be slightly confusing: * PAR - Run all subcomponents in parallel, collate outboxes into one outbox * Seq - Pipeline(A, Seq(B,C,D), E) is like "A | (B; C; D) | E" in the shell. * Carousel - Repeatedly restarts the component mentioned And some are useful in terms of saying "run this, then this, then this" or "run all of these": Then there's chassis type components that manage resources of some kind: * Selector * DemuxerService * Backplane (manages plug splitters) - It's well worth looking at backplanes incidentally. I tend to use them extensively as opposed to creating new chassis style components. * PygameDisplay * OpenGLDisplay * ServerCore (& hence FastRestartServer & SimpleServer) Such components tend to want to advertise to the rest of the system that they're managing services. It was this need to advertise services that led to the creation of the Co-ordinating assistant tracker. The tracker was always expected to be created, but was expected to be used for statistics collection (hence the key/value store). In practice, it was realising that it can be used more like the unix environment that made a difference. For example most version control systems will launch $EDITOR rather than a particular editor. As you'd expect, as time has gone on we've figured out better ways of writing these things, and better idioms. I'll try and cover these here. First of all there's the idea of a basic static chassis. It takes one or more components and wires it up as a subcomponent. If we assume that it does not know in advance how many subcomponents it has, it needs to create inboxes and outboxes dynamically. It may need to pass data through from it's inbox through to a subcomponent's inbox efficiently, and likewise pass data outwards efficiently. Once it's done this, if it's a static chassis, it probably wants to go to sleep and only wake up when components shutdown in order to unwire them, and shutdown when all the children have shutdown. First of all, let's look at how you would re-build the pipeline component. I'm also posting this next chunk on a pastebin, where colorising make make it clearer: http://pastebin.com/FiuJWapm -- No comments & runnable example usage http://pastebin.com/ctgG0vQC -- inline comments Having identified this lot, the code looks like this - declare the basic class, and capture config. class MyPipeline(Axon.Component.component): def __init__(self, *comps, **argv): super(MyPipeline, self).__init__(**argv) if len(comps) < 1: raise ValueError("MyPipeline must have at least 1 subcomponent!") self.comps = comps # Then in the main method... def main(self): # Create the linkages _inside_ the chassis: middle_components_indices = range(0,len(self.comps[0:-1])) for i in middle_components_indices: self.link( (self.comps[i],"outbox"), (self.comps[i+1], "inbox") ) self.link( (self.comps[i],"signal"), (self.comps[i+1], "control") ) # Create linkages that pass data through from the outside world into the # subcomponents. These need to be marked as "passthrough": self.link( (self,"inbox"), (self.comps[0], "inbox"), passthrough=1 ) self.link( (self,"control"), (self.comps[0], "control"), passthrough=1 ) # Create linkages that pass data through from the subcomponents to the # outside world . These need to be marked as "passthrough", but outbound: self.link( (self.comps[-1], "outbox"), (self,"outbox"), passthrough=2 ) self.link( (self.comps[-1], "signal"), (self,"signal"), passthrough=2 ) # We now ought to add these components as children and activate them # Marking a component a child component means that we get woken up # when the component exits self.addChildren(*self.comps) for child in self.childComponents(): child.activate() # We then want to loop, checking periodically to find out if the child # components have exitted or not, and if all of them have exitting. while True: # First of remove exitted child components # If none are left, exit the loop for child in self.childComponents(): if child._isStopped(): # Means they've exitted self.removeChild(child) # deregisters linkages for us if len(self.childComponents()) == 0: break # Since we do nothing otherwise, simply mark us ready to pause and yield. self.pause() yield 1 # Since we only exit the loop once all the child components are shutdown, # we can simply pass on a producerFinished message. We could skip this if # we expected the final component in the pipeline to send on such a message. # Which you do is a choice. self.send(Axon.Ipc.producerFinished(), "signal") # Then just as a cleanup - it shouldn't be needed, but just in case we # decide to change our behaviour in future we can also remove the child # components (this cleans up a bunch of structures) for child in self.childComponents(): self.removeChild(child) # deregisters linkages for Graphline operates in much the same way. Suppose instead of this we had a component that had the following behaviour: * Accepts a stream of data for processing in inbox "inbox" * Can accept new "processors" on inbox "processor" * A processor is simply a component, and also has a filter condition. * Outputs from processors are aggregated (and tagged) on the way out of the outbox "outbox". This is a much more complex component. * We need to accept new processor requests which entails * Reading the processor requests * Adding outboxes to send it data * Adding inboxes to recieve data from it * Tracking the filter function acting as a guard for sending data in * Tracking the tag for tagging data on the way out * We need to read the primary inbox and based on filter functions send data on * We need to track whether subcomponents (filters) have exitted, and if so, unlink them * Upon shutdown we need to shutdown any filters which haven't shutdown. We also need a way of building a test harness for it. It's a deliberately non-trivial example. Not sure how useful, but it's interesting. Since we're adding inboxes & outboxes dynamically, we need to use the AdaptiveCommsComponent. As before, this example is here: http://pastebin.com/mpcsSKeu - without comments http://pastebin.com/HQcDesx4 - with comments Walking through: # We declare our class and main inboxes in the usual way, just with the # new baseclass class TaggingPluggableProcessor(Axon.AdaptiveCommsComponent.AdaptiveCommsComponent): Inboxes = { "inbox": "Stream of data to process", "processor": "Processor requests are sent here", "control" : "So we can shutdown" } def main(self): # We'll need a lookup mechanism to find data associated with filter components # or associated with filter inboxes, so we can initialise that here: self.lookup = {} # We will loop until we recieve data on our control inbox, at which point we # choose to exit the loop while not self.dataReady("control"): # First of all, let's process any pending processor requests: for processor_request in self.Inbox("processor"): self.add_processor(processor_request) # Then, process all data that needs filtering: for data in self.Inbox("inbox"): self.process_messages(data) # Then, handle any processor components that have exitted. We also need # to remove them from out lookup tables, so we'll do that here too. for child in self.childComponents(): if child._isStopped(): # Means they've exitted self.remove_processor(child) self.removeChild(child) # deregisters linkages for us # Finally, we've cleared "inbox", and "processor". We know "control" is # currently empty, so any other messages from any other inboxes are messages # from processor/filter components. That means we have results to send on: if self.anyReady(): # Means that we have data waiting from a processor self.send_on_results() # We can then set the component to pause and yield. if not self.anyReady(): self.pause() yield 1 # Once the loop exits, we simply pass on whatever message caused us to # shutdown self.send(self.recv("control"), "signal") # Similarly, we also need to notify the filter/processor components that # we've ceased sending them any data. This idiom of reusing an outbox is # a convenience that tidies up code. for c in self.childComponents(): L = self.link( (self, "_cs"), (c, "control")) self.send( Axon.Ipc.producerFinished(), "_cs") self.unlink(thelinkage=L) # Now we've done the core logic of the processor, we can now handle adding # new procesors. # First, let's define the method, and unpack the processor request sent to us: def add_processor(self, processor_request): tag, filter_func, filter_component = processor_request # We then need to create an inbox/outbox pair that we can use to send # messages to the processor and recieve results from. # Link to component so we can send it data to_component = self.addOutbox("_to_component") self.link( (self, to_component), (filter_component, "inbox")) # Link from component so we can recieve results from_component = self.addInbox("_from_component") self.link( (filter_component, "outbox"), (self, from_component)) # We then have a useful collection of data about this processor we'd like # to track, so let's put that data into a bundle ... bundle = { "tag": tag, "filter_func":filter_func, "component": filter_component, "to_component" : to_component, "from_component" : from_component } # ...and make it so that we # can retrieve this information either based on the actual component or # based on the inbox we expect to recieve data on from the component. self.lookup[filter_component] = bundle self.lookup[from_component] = bundle # Penultimately, we want to be woken up when this component exits, so we # mark it a child component... self.addChildren(filter_component) # And finally activate the component filter_component.activate() # Actually processing messages then is somewhat simpler - we simply loop # through our child components, look up their bundle, use its filter # function to check the processor should process it and pass it on. def process_messages(self, data): for child in self.childComponents(): bundle = self.lookup[child] if bundle["filter_func"](data): self.send(data, bundle["to_component"]) # Similarly, when sending on results we can take advantage of the fact # that self.anyReady() returns False if there is no inbox with ready # data, but returns the inbox name if there is data ready on any inbox. # This means we can simply repeatedly call anyReady() and if the result # is not false, empty that inbox passing on values to the main outbox. # The only addition to this is that we've chosen to tag data on the way out, # so for that we use the inbox name returned to lookup the data bundle, # to find the correct tag. def send_on_results(self): inbox_ready = self.anyReady() while inbox_ready: bundle = self.lookup[inbox_ready] for message in self.Inbox(inbox_ready): self.send( (bundle["tag"], message), "outbox") inbox_ready = self.anyReady() # Finally removing a processor is about cleanup. For this we remove # processors by component, so we use the component to lookup the bundle, # remove references from the bundle dict, and delete the entries in the # lookup table. def remove_processor(self, child): bundle = self.lookup[child] from_component = bundle["from_component"] del self.lookup[ from_component ] del self.lookup[ child ] for k in bundle: bundle[k] = None # In order to build a test harness for this, we can use Graphline, # Pipeline, DataSource, PureTransformer & Seq. In order to ensure that the # TaggingPluggableProcessor is ready we can introduce a delay into the data # source to process using Seq & a Pauser component. from Kamaelia.Util.Console import ConsoleEchoer from Kamaelia.Util.DataSource import DataSource from Kamaelia.Util.PureTransformer import PureTransformer from Kamaelia.Chassis.Pipeline import Pipeline from Kamaelia.Chassis.Graphline import Graphline from Kamaelia.Chassis.Seq import Seq import time class Pauser(Axon.ThreadedComponent.threadedcomponent): def main(self): time.sleep(1) # In particular, this is an effective test harness: Graphline( DATASOURCE = Seq( Pauser(),DataSource([1,2,3,4,5,6])), PROCESSOR = TaggingPluggableProcessor(), PROCESSORSOURCE = DataSource([ ( "EVEN", lambda x: x % 2==0, PureTransformer(lambda x: "Even ! " + str(x)) ), ( "ODD", lambda x: x % 2==1, PureTransformer(lambda x: "Odd ! " + str(x)) ), ( "THREE", lambda x: x % 3==0, PureTransformer(lambda x: "Divisible by 3 ! " + str(x)) ), ( "FOUR", lambda x: x % 4==0, PureTransformer(lambda x: "Divisible by 3 ! " + str(x)) ), ]), CONSOLE = Pipeline( PureTransformer(lambda x: repr(x)+"\n"), ConsoleEchoer()), linkages = { ("DATASOURCE","outbox"): ("PROCESSOR", "inbox"), ("DATASOURCE","signal"): ("PROCESSOR", "control"), ("PROCESSORSOURCE","outbox"): ("PROCESSOR","processor"), ("PROCESSOR","outbox"): ("CONSOLE", "inbox"), ("PROCESSOR","signal"): ("CONSOLE", "control"), } ).run() Hopefully these 2 examples help somewhat with the idea of chassis components. Regarding timers BTW, there's lots of ways to do these, and my preferred way is to use a threaded component, explicitly prodding the component. Eg: class Prodder(Axon.ThreadedComponent.threadedcomponent): pause = 0.5 message = "ping" def main(self): while not self.dataReady("control"): time.sleep(self.pause) self.send(self.message, "outbox") self.send(self.recv("control"), "signal") Pipeline(Prodder(), ConsoleEchoer()).activate() Pipeline(Prodder(), ConsoleEchoer(pause = 1)).activate() Pipeline(Prodder(), ConsoleEchoer(pause = 2, message="pong")).activate() Pipeline(Prodder(), ConsoleEchoer(message="pong")).run() However you can combine the ideas of a chassis component with this as well, so you can add timeouts to systems. My greylisting mail server does just that like this: class GreylistServer(ServerCore): ... class TCPS(TCPServer): CSA = NoActivityTimeout(ConnectedSocketAdapter, timeout=config["inactivity_timeout"], debug=False) Where NoActivityTimeout is defined here: http://code.google.com/p/kamaelia/source/browse/trunk/Code/Python/Kamaelia/Kamaelia/Internet/TimeOutCSA.py#292 Like this: def NoActivityTimeout(someclass, timeout=2, debug=False): """ This is a factory function that will return a new function object that will produce an InactivityChassis with the given timeout and debug values. The values specified in timeout, debug, and someclass will be used in all future calls to the returned function object. someclass - the class to wrap in an InactivityChassis timeout - the amount of time to wait before sending the shutdown signal debug - the debugger to use """ def maker(self, *args,**argd): X = InactivityChassis(someclass(*args,**argd), timeout=timeout, debug=debug) return X return maker InactivityChassis is then defined like this: def InactivityChassis(somecomponent, timeout=2, debug=False): """ This convenience function will link a component up to an ActivityMonitor and a ResettableSender that will emit a producerFinished signal within timeout seconds if the component does not send any messages on its outbox or CreatorFeedback boxes. To link the specified component to an external component simply link it to the returned chassis's outbox or CreatorFeedback outboxes. """ linkages = { [ snipped in mail for space ] } return Graphline( OBJ=somecomponent, ACT=ActivityMonitor(), SHUTTERDOWNER=ResettableSender(debug=debug, message=producerFinished(), timeout=timeout), linkages = linkages ) ie This has something that WILL send a shutdown message eventually, based on a timer that counts down. That timer can be reset, meaning that any activity resets the countdown. As a result an inactive connection gets shutdown after a certain time period. Underneath all that though how does resettable sender work? class ResettableSender(Axon.ThreadedComponent.threadedcomponent): """ This component represents a simple way of making a timed event occur. By default, it is set up to send a timeout message "NEXT" within 5 seconds. If it receives a message on its inbox, the timer will be reset. This component will ignore any input on its control inbox. """ timeout=5 message="NEXT" debug = False Inboxes = { "inbox" : "Send anything here to reset the timeout countdown.", "control" : "As usual", } Outboxes = { "outbox" : "'NEXT' message sent when the timeout occurs.", "signal" : "As usual", } def main(self): # print "TIMEOUT", repr(self.timeout) now = time.time() while not self.dataReady("control"): time.sleep(1) # Yes, plain old time.sleep if self.dataReady("inbox"): while self.dataReady("inbox"): self.recv("inbox") now = time.time() if time.time() - now > self.timeout: break elif self.debug: print "." self.send(self.message, "outbox") # print "SHUTDOWN", self.name if self.dataReady("control"): self.send(self.recv("control"), "signal") else: self.send(self.Axon.Ipc.producerFinished(), "signal") Incidentally, due to using inheritable defaults, any server can use this like this: class MyServer(ServerCore): logfile = config["greylist_log"] debuglogfile = config["greylist_debuglog"] socketOptions=(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) class TCPS(TCPServer): CSA = NoActivityTimeout(ConnectedSocketAdapter, timeout=config["inactivity_timeout"], debug=False) MyServer(protocol=..., port= ...).run() Finally, I've been doing a fair amount of time related work using kamaelia recently, and I'm very much of the opinion that using time.sleep inside a threaded component is by far and away the most simple thing to do. Hope that all helps, > Many thank yous, and I'm really happy to be back here. You're welcome, and good to hear from you! Regards, Michael. -- You received this message because you are subscribed to the Google Groups "kamaelia" group. To post to this group, send email to [email protected]. To unsubscribe from this group, send email to [email protected]. For more options, visit this group at http://groups.google.com/group/kamaelia?hl=en.
