+1, although I've already modified this a bit in my branch, I think. On Apr 13, 2009, at 10:50 AM, Ethan Rowe wrote:
> > The queue abstract terminus allows the standard indirector behaviors > to interact with a message queue broker, such that the indirector's > "save" method writes the relevant model object out to a queue on the > message broker. While the indirector's "find" method does not map > to a message queue, the queue terminus class offers a "subscribe" > method that allows for easy implementation of an event loop, > receiving indirected objects saved to a queue as they come in. > > Signed-off-by: Ethan Rowe <[email protected]> > --- > lib/puppet/indirector/queue.rb | 78 +++++++++++++++++++++++++++++++ > ++++ > spec/unit/indirector/queue.rb | 87 +++++++++++++++++++++++++++++++ > +++++++++ > 2 files changed, 165 insertions(+), 0 deletions(-) > create mode 100644 lib/puppet/indirector/queue.rb > create mode 100755 spec/unit/indirector/queue.rb > > diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/ > queue.rb > new file mode 100644 > index 0000000..c58af98 > --- /dev/null > +++ b/lib/puppet/indirector/queue.rb > @@ -0,0 +1,78 @@ > +require 'puppet/indirector/terminus' > +require 'puppet/util/queue' > +require 'yaml' > + > +# Implements the <tt>:queue</tt> abstract indirector terminus type, > for storing > +# model instances to a message queue, presumably for the purpose of > out-of-process > +# handling of changes related to the model. > +# > +# Relies upon Puppet::Util::Queue for registry and client object > management, > +# and specifies a default queue type of <tt>:stomp</tt>, > appropriate for use with a variety of message brokers. > +# > +# It's up to the queue client type to instantiate itself correctly > based on Puppet configuration information. > +# > +# A single queue client is maintained for the abstract terminus, > meaning that you can only use one type > +# of queue client, one message broker solution, etc., with the > indirection mechanism. > +# > +# Per-indirection queues are assumed, based on the indirection > name. If the <tt>:catalog</tt> indirection makes > +# use of this <tt>:queue</tt> terminus, queue operations work > against the "catalog" queue. It is up to the queue > +# client library to handle queue creation as necessary (for a > number of popular queuing solutions, queue > +# creation is automatic and not a concern). > +class Puppet::Indirector::Queue < Puppet::Indirector::Terminus > + extend ::Puppet::Util::Queue > + self.queue_type_default = :stomp > + > + # Queue has no idiomatic "find" > + def find(request) > + nil > + end > + > + # Place the request on the queue > + def save(request) > + begin > + Puppet.info "Queueing catalog for %s" % request.key > + client.send_message(queue, render(request.instance)) > + rescue => detail > + raise Puppet::Error, "Could not write %s to queue: %s > \nInstance::%s\n client : %s" % [request.key, > detail,request.instance.to_s,client.to_s] > + end > + end > + > + def self.queue > + indirection_name > + end > + > + def queue > + self.class.queue > + end > + > + # Returns the singleton queue client object. > + def client > + self.class.client > + end > + > + # Formats the model instance associated with _request_ > appropriately for message delivery. > + # Uses YAML serialization. > + def render(obj) > + YAML::dump(obj) > + end > + > + # converts the _message_ from deserialized format to an actual > model instance. > + def self.intern(message) > + YAML::load(message) > + end > + > + # Provides queue subscription functionality; for a given > indirection, use this method on the terminus > + # to subscribe to the indirection-specific queue. Your _block_ > will be executed per new indirection > + # model received from the queue, with _obj_ being the model > instance. > + def self.subscribe > + client.subscribe(queue) do |msg| > + begin > + yield(self.intern(msg)) > + rescue => detail > + # really, this should log the exception rather than > raise it all the way up the stack; > + # we don't want exceptions resulting from a single > message bringing down a listener > + raise Puppet::Error, "Error occured with > subscription to queue %s for indirection %s: %s" % [queue, > indirection_name, detail] > + end > + end > + end > +end > diff --git a/spec/unit/indirector/queue.rb b/spec/unit/indirector/ > queue.rb > new file mode 100755 > index 0000000..de9a27f > --- /dev/null > +++ b/spec/unit/indirector/queue.rb > @@ -0,0 +1,87 @@ > +#!/usr/bin/env ruby > + > +require File.dirname(__FILE__) + '/../../spec_helper' > +require 'puppet/indirector/queue' > + > +class Puppet::Indirector::Queue::TestClient > + def self.reset > + @queues = {} > + end > + > + def self.queues > + @queues ||= {} > + end > + > + def subscribe(queue) > + stack = self.class.queues[queue] ||= [] > + while stack.length > 0 do > + yield(stack.shift) > + end > + end > + > + def send_message(queue, message) > + stack = self.class.queues[queue] ||= [] > + stack.push(message) > + queue > + end > +end > + > +class FooExampleData > + attr_accessor :name > +end > + > +describe Puppet::Indirector::Queue do > + before :each do > + @indirection = stub 'indirection', :name > => :my_queue, :register_terminus_type => nil > + > Puppet > ::Indirector > ::Indirection.stubs(:instance).with(:my_queue).returns(@indirection) > + @store_class = Class.new(Puppet::Indirector::Queue) do > + def self.to_s > + 'MyQueue::MyType' > + end > + end > + @store = @store_class.new > + > + @subject_class = FooExampleData > + @subject = @subject_class.new > + @subject.name = :me > + > + Puppet.settings.stubs(:value).returns("bogus setting data") > + > Puppet > .settings.stubs(:value).with(:queue_client).returns(:test_client) > + > Puppet > ::Util > ::Queue > .stubs > (:queue_type_to_class > ).with(:test_client).returns(Puppet::Indirector::Queue::TestClient) > + Puppet::Indirector::Queue::TestClient.reset > + > + @request = stub 'request', :key => :me, :instance => @subject > + end > + > + it 'should use the correct client type and queue' do > + @store.queue.should == :my_queue > + @store.client.should > be_an_instance_of(Puppet::Indirector::Queue::TestClient) > + end > + > + it 'should use render() to convert object to message' do > + @store.expects(:render).with(@subject).once > + @store.save(@request) > + end > + > + it 'should save and restore with the appropriate queue, and > handle subscribe block' do > + @subject_two = @subject_class.new > + @subject_two.name = :too > + @store.save(@request) > + @store.save(stub('request_two', :key => 'too', :instance => > @subject_two)) > + > + received = [] > + @store_class.subscribe do |obj| > + received.push(obj) > + end > + > + received[0].name.should == @subject.name > + received[1].name.should == @subject_two.name > + end > + > + it 'should use intern() to convert message to object with > subscribe()' do > + @store.save(@request) > + > @store_class.expects(:intern).with(@store.render(@subject)).once > + @store_class.subscribe {|o| o } > + end > +end > + > -- > 1.5.5.1 > > > > -- It is said that power corrupts, but actually it's more true that power attracts the corruptible. The sane are usually attracted by other things than power. -- David Brin --------------------------------------------------------------------- Luke Kanies | http://reductivelabs.com | http://madstop.com --~--~---------~--~----~------------~-------~--~----~ You received this message because you are subscribed to the Google Groups "Puppet Developers" group. To post to this group, send email to [email protected] To unsubscribe from this group, send email to [email protected] For more options, visit this group at http://groups.google.com/group/puppet-dev?hl=en -~----------~----~----~----~------~----~------~--~---
