This patch gives Puppet the ability to find a puppet master via SRV records in DNS. First Puppet will try to resolve the server parameter in puppet.conf (or supplied on command line) to an SRV record before treating it as a regular host. This patch basically adds client-side load-balancing.
Signed-off-by: Andrew J. Forgue <[email protected]> --- lib/puppet/indirector/rest.rb | 62 +++++++++++++++++++++++++++--- lib/puppet/network/resolver.rb | 74 ++++++++++++++++++++++++++++++++++++ lib/puppet/type/file/content.rb | 27 +++++++++---- spec/unit/network/resolver_spec.rb | 68 +++++++++++++++++++++++++++++++++ 4 files changed, 216 insertions(+), 15 deletions(-) create mode 100644 lib/puppet/network/resolver.rb create mode 100755 spec/unit/network/resolver_spec.rb diff --git a/lib/puppet/indirector/rest.rb b/lib/puppet/indirector/rest.rb index eb41ff3..405190c 100644 --- a/lib/puppet/indirector/rest.rb +++ b/lib/puppet/indirector/rest.rb @@ -4,6 +4,7 @@ require 'uri' require 'puppet/network/http_pool' require 'puppet/network/http/api/v1' require 'puppet/network/http/compression' +require 'puppet/network/resolver' # Access objects via REST class Puppet::Indirector::REST < Puppet::Indirector::Terminus @@ -68,26 +69,75 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus end def find(request) - return nil unless result = deserialize(network(request).get(indirection2uri(request), headers)) + result = nil + + Puppet::Network::Resolver.servers(request.server || self.class.server, request.port || self.class.port) do |server, port, remaining| + request.server = server + request.port = port + + begin + result = deserialize(network(request).get(indirection2uri(request), headers)) + break + rescue SystemCallError => e + Puppet.warning "Error connecting to #{server}:#{port}: #{e.message}" + raise unless remaining > 0 + end + end + + return nil unless result result.name = request.key if result.respond_to?(:name=) result end def search(request) - unless result = deserialize(network(request).get(indirection2uri(request), headers), true) - return [] + result = nil + + Puppet::Network::Resolver.servers(request.server || self.class.server, request.port || self.class.port) do |server, port, remaining| + request.server = server + request.port = port + + begin + result = deserialize(network(request).get(indirection2uri(request), headers), true) + break + rescue SystemCallError => e + Puppet.warning "Error connecting to #{server}:#{port}: #{e.message}" + raise unless remaining > 0 + end end - result + + result || [] end def destroy(request) raise ArgumentError, "DELETE does not accept options" unless request.options.empty? - deserialize network(request).delete(indirection2uri(request), headers) + + Puppet::Network::Resolver.servers(request.server || self.class.server, request.port || self.class.port) do |server, port, remaining| + request.server = server + request.port = port + + begin + return deserialize network(request).delete(indirection2uri(request), headers) + rescue SystemCallError => e + Puppet.warning "Error connecting to #{server}:#{port}: #{e.message}" + raise unless remaining > 0 + end + end end def save(request) raise ArgumentError, "PUT does not accept options" unless request.options.empty? - deserialize network(request).put(indirection2uri(request), request.instance.render, headers.merge({ "Content-Type" => request.instance.mime })) + + Puppet::Network::Resolver.servers(request.server || self.class.server, request.port || self.class.port) do |server, port, remaining| + request.server = server + request.port = port + + begin + return deserialize network(request).put(indirection2uri(request), request.instance.render, headers.merge({ "Content-Type" => request.instance.mime })) + rescue SystemCallError => e + Puppet.warning "Error connecting to #{server}:#{port}: #{e.message}" + raise unless remaining > 0 + end + end end private diff --git a/lib/puppet/network/resolver.rb b/lib/puppet/network/resolver.rb new file mode 100644 index 0000000..9165efb --- /dev/null +++ b/lib/puppet/network/resolver.rb @@ -0,0 +1,74 @@ +require 'resolv' +module Puppet::Network; end + +module Puppet::Network::Resolver + + # Iterate through the list of servers that service this hostname + # and yield each server/port since SRV records have ports in them + # It will override whatever masterport setting is already set. + def self.servers(hostname, port) + resolver = Resolv::DNS.new + + Puppet.debug "Searching for SRV records for #{hostname}" + + rrs = resolver.getresources(hostname, Resolv::DNS::Resource::IN::SRV) + + # If this hostname doesn't have any SRV records, we just return the + # configured host and port. + unless rrs.empty? + Puppet.debug "Found #{rrs.size} SRV records." + + # Find a list of priorities, try everything in the priority first + # and then move to the next priority, would really like to have used + # Array#group_by here, but puppet supports 1.8.1 + priorities = rrs.map { |record| record.priority }.sort.uniq + + priorities.each do |priority| + priority_group = rrs.select { |rr| rr if rr.priority == priority }.compact + + while next_rr = priority_group.delete(find_weighted_server(priority_group)) + + Puppet.debug "Yielding next server of #{next_rr.target.to_s}:#{next_rr.port}" + + # Remove the RR from the master list and update the count + rrs.delete next_rr + + yield next_rr.target.to_s, next_rr.port, rrs.size + + Puppet.debug "Remaining servers: #{rrs.size}" + end + end + else + Puppet.debug "Found no SRV records for #{hostname}" + + # If there are no SRV records, just return the configured host/port + yield hostname, port, 0 + end + end + + private + + def self.find_weighted_server(records) + return nil if records.nil? || records.empty? + return records.first if records.size == 1 + + # Calculate the sum of all weights in the list of resource records, + # This is used to then select hosts until the weight exceeds what + # random number we selected. For example, if we have weights of 1 8 and 3: + # + # |-|---|--------| + # ^ + # We generate a random number 5, and iterate through the records, adding + # the current record's weight to the accumulator until the weight of the + # current record plus previous records is greater than the random number. + + total_weight = records.inject(0) { |sum,record| sum + record.weight } + current_weight = 0 + chosen_weight = 1 + rand(total_weight) + + records.each do |record| + current_weight += record.weight + return record if current_weight >= chosen_weight + end + end +end diff --git a/lib/puppet/type/file/content.rb b/lib/puppet/type/file/content.rb index b8f30a9..8f6465d 100755 --- a/lib/puppet/type/file/content.rb +++ b/lib/puppet/type/file/content.rb @@ -191,15 +191,24 @@ module Puppet def chunk_file_from_source(source_or_content) request = Puppet::Indirector::Request.new(:file_content, :find, source_or_content.full_path.sub(/^\//,'')) - connection = Puppet::Network::HttpPool.http_instance(source_or_content.server, source_or_content.port) - connection.request_get(indirection2uri(request), add_accept_encoding({"Accept" => "raw"})) do |response| - case response.code - when "404"; nil - when /^2/; uncompress(response) { |uncompressor| response.read_body { |chunk| yield uncompressor.uncompress(chunk) } } - else - # Raise the http error if we didn't get a 'success' of some kind. - message = "Error #{response.code} on SERVER: #{(response.body||'').empty? ? response.message : uncompress_body(response)}" - raise Net::HTTPError.new(message, response) + + Puppet::Network::Resolver.servers(source_or_content.server, source_or_content.port) do |server, port, remaining| + begin + connection = Puppet::Network::HttpPool.http_instance(server, port) + connection.request_get(indirection2uri(request), add_accept_encoding({"Accept" => "raw"})) do |response| + case response.code + when "404"; nil + when /^2/; uncompress(response) { |uncompressor| response.read_body { |chunk| yield uncompressor.uncompress(chunk) } } + else + # Raise the http error if we didn't get a 'success' of some kind. + message = "Error #{response.code} on SERVER: #{(response.body||'').empty? ? response.message : uncompress_body(response)}" + raise Net::HTTPError.new(message, response) + end + end + break + rescue SystemCallError => e + Puppet.warning "Error connecting to #{server}:#{port}: #{e.message}" + raise unless remaining > 0 end end end diff --git a/spec/unit/network/resolver_spec.rb b/spec/unit/network/resolver_spec.rb new file mode 100755 index 0000000..3534a4b --- /dev/null +++ b/spec/unit/network/resolver_spec.rb @@ -0,0 +1,68 @@ +#!/usr/bin/env ruby +require File.dirname(__FILE__) + '/../../spec_helper' +require 'puppet/network/resolver' + +describe Puppet::Network::Resolver do + before do + @dns_mock_object = mock('dns') + Resolv::DNS.expects(:new).returns(@dns_mock_object) + + @rr_type = Resolv::DNS::Resource::IN::SRV + @test_srv_hostname = "_puppet._tcp.domain.com" + @test_a_hostname = "puppet.domain.com" + @test_port = 1000 + end + + + describe "when resolving a host without SRV records" do + it "should return the supplied host and port" do + + # No records returned for a DNS entry without any SRV records + @dns_mock_object.expects(:getresources).with(@test_a_hostname, @rr_type).returns([]) + + Puppet::Network::Resolver.servers(@test_a_hostname, @test_port) do |hostname, port, remaining| + hostname.should eql(@test_a_hostname) + port.should eql(@test_port) + remaining.should eql(0) + end + end + end + + describe "when resolving a host with SRV records" do + it "should iterate through records in priority order" do + # The records we should use. + # priority, weight, port, hostname + test_records = [ + Resolv::DNS::Resource::IN::SRV.new(0, 20, 8140, "puppet1.domain.com"), + Resolv::DNS::Resource::IN::SRV.new(0, 100, 8140, "puppet2.domain.com"), + Resolv::DNS::Resource::IN::SRV.new(1, 1, 8140, "puppet3.domain.com"), + Resolv::DNS::Resource::IN::SRV.new(4, 1, 8140, "puppet4.domain.com") + ] + + # The order of the records that should be returned, + # an array means unordered (for weight) + order = { + 0 => ["puppet1.domain.com", "puppet2.domain.com"], + 1 => ["puppet3.domain.com"], + 2 => ["puppet4.domain.com"] + } + + @dns_mock_object.expects(:getresources).with(@test_srv_hostname, @rr_type).returns(test_records) + + Puppet::Network::Resolver.servers(@test_srv_hostname, @test_port) do |hostname, port, remaining| + expected_priority = order.keys.min + + order[expected_priority].should include(hostname) + port.should_not be(@test_port) + + # Remove the host from our expected hosts + order[expected_priority].delete hostname + + remaining.should be(order.inject(0) { |sum, item| sum += item[1].count }) + + # Remove this priority level if we're done with it + order.delete expected_priority if order[expected_priority] == [] + end + end + end +end -- 1.7.1 -- You received this message because you are subscribed to the Google Groups "Puppet Developers" group. To post to this group, send email to [email protected]. To unsubscribe from this group, send email to [email protected]. For more options, visit this group at http://groups.google.com/group/puppet-dev?hl=en.
