+1, although I've already modified this a bit in my branch, I think.

On Apr 13, 2009, at 10:50 AM, Ethan Rowe wrote:

>
> The queue abstract terminus allows the standard indirector behaviors  
> to interact with a message queue broker, such that the indirector's  
> "save" method writes the relevant model object out to a queue on the  
> message broker.  While the indirector's "find" method does not map  
> to a message queue, the queue terminus class offers a "subscribe"  
> method that allows for easy implementation of an event loop,  
> receiving indirected objects saved to a queue as they come in.
>
> Signed-off-by: Ethan Rowe <[email protected]>
> ---
> lib/puppet/indirector/queue.rb |   78 +++++++++++++++++++++++++++++++ 
> ++++
> spec/unit/indirector/queue.rb  |   87 +++++++++++++++++++++++++++++++ 
> +++++++++
> 2 files changed, 165 insertions(+), 0 deletions(-)
> create mode 100644 lib/puppet/indirector/queue.rb
> create mode 100755 spec/unit/indirector/queue.rb
>
> diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/ 
> queue.rb
> new file mode 100644
> index 0000000..c58af98
> --- /dev/null
> +++ b/lib/puppet/indirector/queue.rb
> @@ -0,0 +1,78 @@
> +require 'puppet/indirector/terminus'
> +require 'puppet/util/queue'
> +require 'yaml'
> +
> +# Implements the <tt>:queue</tt> abstract indirector terminus type,  
> for storing
> +# model instances to a message queue, presumably for the purpose of  
> out-of-process
> +# handling of changes related to the model.
> +#
> +# Relies upon Puppet::Util::Queue for registry and client object  
> management,
> +# and specifies a default queue type of <tt>:stomp</tt>,  
> appropriate for use with a variety of message brokers.
> +#
> +# It's up to the queue client type to instantiate itself correctly  
> based on Puppet configuration information.
> +#
> +# A single queue client is maintained for the abstract terminus,  
> meaning that you can only use one type
> +# of queue client, one message broker solution, etc., with the  
> indirection mechanism.
> +#
> +# Per-indirection queues are assumed, based on the indirection  
> name.  If the <tt>:catalog</tt> indirection makes
> +# use of this <tt>:queue</tt> terminus, queue operations work  
> against the "catalog" queue.  It is up to the queue
> +# client library to handle queue creation as necessary (for a  
> number of popular queuing solutions, queue
> +# creation is automatic and not a concern).
> +class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
> +    extend ::Puppet::Util::Queue
> +    self.queue_type_default = :stomp
> +
> +    # Queue has no idiomatic "find"
> +    def find(request)
> +        nil
> +    end
> +
> +    # Place the request on the queue
> +    def save(request)
> +        begin
> +            Puppet.info "Queueing catalog for %s" % request.key
> +            client.send_message(queue, render(request.instance))
> +        rescue => detail
> +            raise Puppet::Error, "Could not write %s to queue: %s 
> \nInstance::%s\n client : %s" % [request.key,  
> detail,request.instance.to_s,client.to_s]
> +        end
> +    end
> +
> +    def self.queue
> +        indirection_name
> +    end
> +
> +    def queue
> +        self.class.queue
> +    end
> +
> +    # Returns the singleton queue client object.
> +    def client
> +        self.class.client
> +    end
> +
> +    # Formats the model instance associated with _request_  
> appropriately for message delivery.
> +    # Uses YAML serialization.
> +    def render(obj)
> +        YAML::dump(obj)
> +    end
> +
> +    # converts the _message_ from deserialized format to an actual  
> model instance.
> +    def self.intern(message)
> +        YAML::load(message)
> +    end
> +
> +    # Provides queue subscription functionality; for a given  
> indirection, use this method on the terminus
> +    # to subscribe to the indirection-specific queue.  Your _block_  
> will be executed per new indirection
> +    # model received from the queue, with _obj_ being the model  
> instance.
> +    def self.subscribe
> +        client.subscribe(queue) do |msg|
> +            begin
> +                yield(self.intern(msg))
> +            rescue => detail
> +                # really, this should log the exception rather than  
> raise it all the way up the stack;
> +                # we don't want exceptions resulting from a single  
> message bringing down a listener
> +                raise Puppet::Error, "Error occured with  
> subscription to queue %s for indirection %s: %s" % [queue,  
> indirection_name, detail]
> +            end
> +        end
> +    end
> +end
> diff --git a/spec/unit/indirector/queue.rb b/spec/unit/indirector/ 
> queue.rb
> new file mode 100755
> index 0000000..de9a27f
> --- /dev/null
> +++ b/spec/unit/indirector/queue.rb
> @@ -0,0 +1,87 @@
> +#!/usr/bin/env ruby
> +
> +require File.dirname(__FILE__) + '/../../spec_helper'
> +require 'puppet/indirector/queue'
> +
> +class Puppet::Indirector::Queue::TestClient
> +    def self.reset
> +        @queues = {}
> +    end
> +
> +    def self.queues
> +        @queues ||= {}
> +    end
> +
> +    def subscribe(queue)
> +        stack = self.class.queues[queue] ||= []
> +        while stack.length > 0 do
> +            yield(stack.shift)
> +        end
> +    end
> +
> +    def send_message(queue, message)
> +        stack = self.class.queues[queue] ||= []
> +        stack.push(message)
> +        queue
> +    end
> +end
> +
> +class FooExampleData
> +    attr_accessor :name
> +end
> +
> +describe Puppet::Indirector::Queue do
> +    before :each do
> +        @indirection = stub 'indirection', :name  
> => :my_queue, :register_terminus_type => nil
> +         
> Puppet 
> ::Indirector 
> ::Indirection.stubs(:instance).with(:my_queue).returns(@indirection)
> +        @store_class = Class.new(Puppet::Indirector::Queue) do
> +            def self.to_s
> +                'MyQueue::MyType'
> +            end
> +        end
> +        @store = @store_class.new
> +
> +        @subject_class = FooExampleData
> +        @subject = @subject_class.new
> +        @subject.name = :me
> +
> +        Puppet.settings.stubs(:value).returns("bogus setting data")
> +         
> Puppet 
> .settings.stubs(:value).with(:queue_client).returns(:test_client)
> +         
> Puppet 
> ::Util 
> ::Queue 
> .stubs 
> (:queue_type_to_class 
> ).with(:test_client).returns(Puppet::Indirector::Queue::TestClient)
> +        Puppet::Indirector::Queue::TestClient.reset
> +
> +        @request = stub 'request', :key => :me, :instance => @subject
> +    end
> +
> +    it 'should use the correct client type and queue' do
> +        @store.queue.should == :my_queue
> +        @store.client.should  
> be_an_instance_of(Puppet::Indirector::Queue::TestClient)
> +    end
> +
> +    it 'should use render() to convert object to message' do
> +        @store.expects(:render).with(@subject).once
> +        @store.save(@request)
> +    end
> +
> +    it 'should save and restore with the appropriate queue, and  
> handle subscribe block' do
> +        @subject_two = @subject_class.new
> +        @subject_two.name = :too
> +        @store.save(@request)
> +        @store.save(stub('request_two', :key => 'too', :instance =>  
> @subject_two))
> +
> +        received = []
> +        @store_class.subscribe do |obj|
> +            received.push(obj)
> +        end
> +
> +        received[0].name.should == @subject.name
> +        received[1].name.should == @subject_two.name
> +    end
> +
> +    it 'should use intern() to convert message to object with  
> subscribe()' do
> +        @store.save(@request)
> +         
> @store_class.expects(:intern).with(@store.render(@subject)).once
> +        @store_class.subscribe {|o| o }
> +    end
> +end
> +
> -- 
> 1.5.5.1
>
>
> >


-- 
It is said that power corrupts, but actually it's more true that power
attracts the corruptible. The sane are usually attracted by other things
than power. -- David Brin
---------------------------------------------------------------------
Luke Kanies | http://reductivelabs.com | http://madstop.com


--~--~---------~--~----~------------~-------~--~----~
You received this message because you are subscribed to the Google Groups 
"Puppet Developers" group.
To post to this group, send email to [email protected]
To unsubscribe from this group, send email to 
[email protected]
For more options, visit this group at 
http://groups.google.com/group/puppet-dev?hl=en
-~----------~----~----~----~------~----~------~--~---

Reply via email to