Hi, I'm proud to release the second part of the puppet streaming patch, which this times deals with HTTP compression. This patch is based on the #3373 patch I released last week with Markus comments added, which itself was based on Luke's checksum patch. So it is not directly appliable on testing (still waiting testing to be merged in master to rebase).
A stupid benchmark: still with the same 100MiB file I used in the #3373 benchmarks, with the master running below mongrel+nginx: Compression disabled: time: 22.68s Compression enabled: time: 11.83s Of course this completely biased since the 100MiB in question is highly compressible (it is full of null bytes :-)). Stay tuned for more bench. The patch is available in my github repository in the tickets/master/3408 branch: http://github.com/masterzen/puppet/tree/tickets/master/3408 Please review, especially my attempt in compression.rb to overcome the problem of missing zlib in some version of ruby. Thanks, Brice Original commit msg: This patch adds HTTP response decompression (both gzip and deflate streams). This feature is disabled by default, and enabled with --http_compression. This feature can be activated only if the local ruby version supports the zlib ruby extension. HTTP response decompression is active for all REST communications and file sourcing. To enable http compression on the server side, it is needed to use a reverse proxy like Apache or Nginx with adhoc configuration: Nginx: gzip on; gzip_types text/pson text/json text/marshall text/yaml application/x-raw text/plain; Apache: LoadModule deflate_module /usr/lib/apache2/modules/mod_deflate.so AddOutputFilterByType DEFLATE text/plain text/pson text/json text/marshall text/yaml application/x-raw Signed-off-by: Brice Figureau <[email protected]> --- lib/puppet/defaults.rb | 8 +- lib/puppet/indirector/rest.rb | 14 ++- lib/puppet/network/http/compression.rb | 112 +++++++++++++++++ lib/puppet/type/file/content.rb | 8 +- spec/integration/defaults.rb | 4 + spec/unit/indirector/certificate/rest.rb | 1 + spec/unit/indirector/rest.rb | 35 +++++- spec/unit/network/http/compression.rb | 199 ++++++++++++++++++++++++++++++ spec/unit/type/file/content.rb | 50 ++++++++ 9 files changed, 421 insertions(+), 10 deletions(-) create mode 100644 lib/puppet/network/http/compression.rb create mode 100644 spec/unit/network/http/compression.rb diff --git a/lib/puppet/defaults.rb b/lib/puppet/defaults.rb index b2e8492..a0dbfec 100644 --- a/lib/puppet/defaults.rb +++ b/lib/puppet/defaults.rb @@ -607,7 +607,13 @@ module Puppet :graph => [false, "Whether to create dot graph files for the different configuration graphs. These dot files can be interpreted by tools like OmniGraffle or dot (which is part of ImageMagick)."], - :graphdir => ["$statedir/graphs", "Where to store dot-outputted graphs."] + :graphdir => ["$statedir/graphs", "Where to store dot-outputted graphs."], + :http_compression => [false, "Allow http compression in REST communication with the master. + This setting might improve performance for puppetd -> puppetmasterd communications over slow WANs. + Your puppetmaster needs to support compression (usually by activating some settings in a reverse-proxy + in front of the puppetmaster, which rules out webrick). + It is harmless to activate this settings if your master doesn't support + compression, but if it supports it, this setting might reduce performance on high-speed LANs."] ) # Plugin information. diff --git a/lib/puppet/indirector/rest.rb b/lib/puppet/indirector/rest.rb index a89e986..f9b11c1 100644 --- a/lib/puppet/indirector/rest.rb +++ b/lib/puppet/indirector/rest.rb @@ -3,10 +3,12 @@ require 'uri' require 'puppet/network/http_pool' require 'puppet/network/http/api/v1' +require 'puppet/network/http/compression' # Access objects via REST class Puppet::Indirector::REST < Puppet::Indirector::Terminus include Puppet::Network::HTTP::API::V1 + include Puppet::Network::HTTP::Compression.module class << self attr_reader :server_setting, :port_setting @@ -43,22 +45,24 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus content_type = response['content-type'].gsub(/\s*;.*$/,'') # strip any appended charset + body = uncompress_body(response) + # Convert the response to a deserialized object. if multiple - model.convert_from_multiple(content_type, response.body) + model.convert_from_multiple(content_type, body) else - model.convert_from(content_type, response.body) + model.convert_from(content_type, body) end else # Raise the http error if we didn't get a 'success' of some kind. - message = "Error %s on SERVER: %s" % [response.code, (response.body||'').empty? ? response.message : response.body] + message = "Error %s on SERVER: %s" % [response.code, (response.body||'').empty? ? response.message : uncompress_body(response)] raise Net::HTTPError.new(message, response) end end # Provide appropriate headers. def headers - {"Accept" => model.supported_formats.join(", ")} + add_accept_encoding({"Accept" => model.supported_formats.join(", ")}) end def network(request) @@ -67,7 +71,7 @@ class Puppet::Indirector::REST < Puppet::Indirector::Terminus def find(request) return nil unless result = deserialize(network(request).get(indirection2uri(request), headers)) - result.name = request.key + result.name = request.key if result.respond_to?(:name=) result end diff --git a/lib/puppet/network/http/compression.rb b/lib/puppet/network/http/compression.rb new file mode 100644 index 0000000..722ecfe --- /dev/null +++ b/lib/puppet/network/http/compression.rb @@ -0,0 +1,112 @@ +require 'puppet/network/http' + +module Puppet::Network::HTTP::Compression + + # this module function allows to use the right underlying + # methods depending on zlib presence + def module + return Active if Puppet.features.zlib? + return None + end + module_function :module + + module Active + require 'zlib' + require 'stringio' + + # return an uncompressed body if the response has been + # compressed + def uncompress_body(response) + case response['content-encoding'] + when 'gzip' + return Zlib::GzipReader.new(StringIO.new(response.body)).read + when 'deflate' + return Zlib::Inflate.new().inflate(response.body) + when nil, 'identity' + return response.body + else + raise Net::HTTPError.new("Unknown content encoding - #{response['content-encoding']}", response) + end + end + + def uncompress(response) + raise Net::HTTPError.new("No block passed") unless block_given? + + case response['content-encoding'] + when 'gzip','deflate' + uncompressor = ZlibAdapter.new + when nil, 'identity' + uncompressor = IdentityAdapter.new + else + raise Net::HTTPError.new("Unknown content encoding - #{response['content-encoding']}", response) + end + + yield uncompressor + + uncompressor.close + end + + def add_accept_encoding(headers={}) + headers['accept-encoding'] = 'gzip; q=1.0, deflate; q=1.0; identity' if Puppet.settings[:http_compression] + headers + end + + # This adapters knows how to uncompress both 'zlib' stream (the deflate algorithm from Content-Encoding) + # and GZip streams. + class ZlibAdapter + def initialize + # Create an inflater that knows to parse GZip streams and zlib streams. + # This uses a property of the C Zlib library, documented as follow: + # windowBits can also be greater than 15 for optional gzip decoding. Add + # 32 to windowBits to enable zlib and gzip decoding with automatic header + # detection, or add 16 to decode only the gzip format (the zlib format will + # return a Z_DATA_ERROR). If a gzip stream is being decoded, strm->adler is + # a crc32 instead of an adler32. + @uncompressor = Zlib::Inflate.new(15 + 32) + @first = true + end + + def uncompress(chunk) + out = @uncompressor.inflate(chunk) + @first = false + return out + rescue Zlib::DataError => z + # it can happen that we receive a raw deflate stream + # which might make our inflate throw a data error. + # in this case, we try with a verbatim (no header) + # deflater. + @uncompressor = Zlib::Inflate.new + retry if @first + raise + end + + def close + @uncompressor.finish + @uncompressor.close + end + end + end + + module None + def uncompress_body(response) + response.body + end + + def add_accept_encoding(headers) + headers + end + + def uncompress(response) + yield IdentityAdapter.new + end + end + + class IdentityAdapter + def uncompress(chunk) + chunk + end + + def close + end + end +end diff --git a/lib/puppet/type/file/content.rb b/lib/puppet/type/file/content.rb index c162d17..0d6391f 100755 --- a/lib/puppet/type/file/content.rb +++ b/lib/puppet/type/file/content.rb @@ -3,12 +3,14 @@ require 'uri' require 'puppet/util/checksums' require 'puppet/network/http/api/v1' +require 'puppet/network/http/compression' module Puppet Puppet::Type.type(:file).newproperty(:content) do include Puppet::Util::Diff include Puppet::Util::Checksums include Puppet::Network::HTTP::API::V1 + include Puppet::Network::HTTP::Compression.module attr_reader :actual_content @@ -167,13 +169,13 @@ module Puppet else request = Puppet::Indirector::Request.new(:file_content, :find, source_or_content.full_path) connection = Puppet::Network::HttpPool.http_instance(source_or_content.server, source_or_content.port) - connection.request_get(indirection2uri(request), {"Accept" => "raw"}) do |response| + connection.request_get(indirection2uri(request), add_accept_encoding({"Accept" => "raw"})) do |response| case response.code when "404"; nil - when /^2/; response.read_body { |chunk| yield chunk } + when /^2/; uncompress(response) { |uncompressor| response.read_body { |chunk| yield uncompressor.uncompress(chunk) } } else # Raise the http error if we didn't get a 'success' of some kind. - message = "Error %s on SERVER: %s" % [response.code, (response.body||'').empty? ? response.message : response.body] + message = "Error %s on SERVER: %s" % [response.code, (response.body||'').empty? ? response.message : uncompress_body(response)] raise Net::HTTPError.new(message, response) end end diff --git a/spec/integration/defaults.rb b/spec/integration/defaults.rb index 1888813..ad3d42b 100755 --- a/spec/integration/defaults.rb +++ b/spec/integration/defaults.rb @@ -228,4 +228,8 @@ describe "Puppet defaults" do it "should have a 'postrun_command' that defaults to the empty string" do Puppet.settings[:postrun_command].should == "" end + + it "should have an http_compression setting that defaults to false" do + Puppet.settings[:http_compression].should be_false + end end diff --git a/spec/unit/indirector/certificate/rest.rb b/spec/unit/indirector/certificate/rest.rb index a325754..9f272fb 100755 --- a/spec/unit/indirector/certificate/rest.rb +++ b/spec/unit/indirector/certificate/rest.rb @@ -47,6 +47,7 @@ rn/G response = stub 'response', :code => "200", :body => cert_string response.stubs(:[]).with('content-type').returns "text/plain" + response.stubs(:[]).with('content-encoding') network.expects(:get).returns response request = Puppet::Indirector::Request.new(:certificate, :find, "foo.com") diff --git a/spec/unit/indirector/rest.rb b/spec/unit/indirector/rest.rb index d12e3c6..0b8fc60 100755 --- a/spec/unit/indirector/rest.rb +++ b/spec/unit/indirector/rest.rb @@ -41,6 +41,7 @@ describe Puppet::Indirector::REST do @response = stub('mock response', :body => 'result', :code => "200") @response.stubs(:[]).with('content-type').returns "text/plain" + @response.stubs(:[]).with('content-encoding').returns nil @searcher = @rest_class.new @searcher.stubs(:model).returns @model @@ -97,6 +98,7 @@ describe Puppet::Indirector::REST do @response = mock 'response' @response.stubs(:code).returns rc.to_s + @response.stubs(:[]).with('content-encoding').returns nil @response.stubs(:message).returns "There was a problem (header)" end @@ -119,14 +121,23 @@ describe Puppet::Indirector::REST do @response.stubs(:body).returns nil lambda { @searcher.deserialize(@response) }.should raise_error(Net::HTTPError,"Error #{rc} on SERVER: There was a problem (header)") end + + describe "and with http compression" do + it "should uncompress the body" do + @response.stubs(:body).returns("compressed body") + @searcher.expects(:uncompress_body).with(@response).returns("uncompressed") + lambda { @searcher.deserialize(@response) }.should raise_error { |e| e.message =~ /uncompressed/ } + end + end end - } + } it "should return the results of converting from the format specified by the content-type header if the response code is in the 200s" do @model.expects(:convert_from).with("myformat", "mydata").returns "myobject" response = mock 'response' response.stubs(:[]).with("content-type").returns "myformat" + response.stubs(:[]).with("content-encoding").returns nil response.stubs(:body).returns "mydata" response.stubs(:code).returns "200" @@ -138,6 +149,7 @@ describe Puppet::Indirector::REST do response = mock 'response' response.stubs(:[]).with("content-type").returns "myformat" + response.stubs(:[]).with("content-encoding").returns nil response.stubs(:body).returns "mydata" response.stubs(:code).returns "200" @@ -149,11 +161,25 @@ describe Puppet::Indirector::REST do response = mock 'response' response.stubs(:[]).with("content-type").returns "text/plain; charset=utf-8" + response.stubs(:[]).with("content-encoding").returns nil response.stubs(:body).returns "mydata" response.stubs(:code).returns "200" @searcher.deserialize(response) end + + it "should uncompress the body" do + @model.expects(:convert_from).with("myformat", "uncompressed mydata").returns "myobject" + + response = mock 'response' + response.stubs(:[]).with("content-type").returns "myformat" + response.stubs(:body).returns "compressed mydata" + response.stubs(:code).returns "200" + + @searcher.expects(:uncompress_body).with(response).returns("uncompressed mydata") + + @searcher.deserialize(response).should == "myobject" + end end describe "when creating an HTTP client" do @@ -221,6 +247,13 @@ describe Puppet::Indirector::REST do @searcher.find(@request) end + it "should add Accept-Encoding header" do + @searcher.expects(:add_accept_encoding).returns({"accept-encoding" => "gzip"}) + + @connection.expects(:get).with { |path, args| args["accept-encoding"] == "gzip" }.returns(@response) + @searcher.find(@request) + end + it "should deserialize and return the network response" do @searcher.expects(:deserialize).with(@response).returns @instance @searcher.find(@request).should equal(@instance) diff --git a/spec/unit/network/http/compression.rb b/spec/unit/network/http/compression.rb new file mode 100644 index 0000000..9b9a2a5 --- /dev/null +++ b/spec/unit/network/http/compression.rb @@ -0,0 +1,199 @@ +#!/usr/bin/env ruby + +require File.dirname(__FILE__) + '/../../../spec_helper' + +describe "http compression" do + + describe "when zlib is not available" do + before(:each) do + Puppet.features.stubs(:zlib?).returns false + + require 'puppet/network/http/compression' + class HttpUncompressor + include Puppet::Network::HTTP::Compression::None + end + + @uncompressor = HttpUncompressor.new + end + + it "should have a module function that returns the None underlying module" do + Puppet::Network::HTTP::Compression.module.should == Puppet::Network::HTTP::Compression::None + end + + it "should not add any Accept-Encoding header" do + @uncompressor.add_accept_encoding({}).should == {} + end + + it "should not tamper the body" do + response = stub 'response', :body => "data" + @uncompressor.uncompress_body(response).should == "data" + end + + it "should yield an identity uncompressor" do + response = stub 'response' + @uncompressor.uncompress(response) { |u| + u.should be_instance_of Puppet::Network::HTTP::Compression::IdentityAdapter + } + end + end + + describe "when zlib is available" do + confine "Zlib is missing" => Puppet.features.zlib? + + before(:each) do + Puppet.features.stubs(:zlib?).returns true + + require 'puppet/network/http/compression' + class HttpUncompressor + include Puppet::Network::HTTP::Compression::Active + end + + @uncompressor = HttpUncompressor.new + end + + it "should have a module function that returns the Active underlying module" do + Puppet::Network::HTTP::Compression.module.should == Puppet::Network::HTTP::Compression::Active + end + + it "should add an Accept-Encoding header when http compression is available" do + Puppet.settings.expects(:[]).with(:http_compression).returns(true) + headers = @uncompressor.add_accept_encoding({}) + headers.should have_key('accept-encoding') + headers['accept-encoding'].should =~ /gzip/ + headers['accept-encoding'].should =~ /deflate/ + headers['accept-encoding'].should =~ /identity/ + end + + it "should not add Accept-Encoding header if http compression is not available" do + Puppet.settings.stubs(:[]).with(:http_compression).returns(false) + @uncompressor.add_accept_encoding({}).should == {} + end + + describe "when uncompressing response body" do + before do + @response = stub 'response' + @response.stubs(:[]).with('content-encoding') + @response.stubs(:body).returns("mydata") + end + + it "should return untransformed response body with no content-encoding" do + @uncompressor.uncompress_body(@response).should == "mydata" + end + + it "should return untransformed response body with 'identity' content-encoding" do + @response.stubs(:[]).with('content-encoding').returns('identity') + @uncompressor.uncompress_body(@response).should == "mydata" + end + + it "should use a Zlib inflater with 'deflate' content-encoding" do + @response.stubs(:[]).with('content-encoding').returns('deflate') + + inflater = stub 'inflater' + Zlib::Inflate.expects(:new).returns(inflater) + inflater.expects(:inflate).with("mydata").returns "uncompresseddata" + + @uncompressor.uncompress_body(@response).should == "uncompresseddata" + end + + it "should use a GzipReader with 'gzip' content-encoding" do + @response.stubs(:[]).with('content-encoding').returns('gzip') + + io = stub 'io' + StringIO.expects(:new).with("mydata").returns io + + reader = stub 'gzip reader' + Zlib::GzipReader.expects(:new).with(io).returns(reader) + reader.expects(:read).returns "uncompresseddata" + + @uncompressor.uncompress_body(@response).should == "uncompresseddata" + end + end + + describe "when uncompressing by chunk" do + before do + @response = stub 'response' + @response.stubs(:[]).with('content-encoding') + + @inflater = stub_everything 'inflater' + Zlib::Inflate.stubs(:new).returns(@inflater) + end + + it "should yield an identity uncompressor with no content-encoding" do + @uncompressor.uncompress(@response) { |u| + u.should be_instance_of Puppet::Network::HTTP::Compression::IdentityAdapter + } + end + + it "should yield an identity uncompressor with 'identity' content-encoding" do + @response.stubs(:[]).with('content-encoding').returns 'identity' + @uncompressor.uncompress(@response) { |u| + u.should be_instance_of Puppet::Network::HTTP::Compression::IdentityAdapter + } + end + + %w{gzip deflate}.each do |c| + it "should yield a Zlib uncompressor with '#{c}' content-encoding" do + @response.stubs(:[]).with('content-encoding').returns c + @uncompressor.uncompress(@response) { |u| + u.should be_instance_of Puppet::Network::HTTP::Compression::Active::ZlibAdapter + } + end + end + + it "should close the underlying adapter" do + adapter = stub_everything 'adapter' + Puppet::Network::HTTP::Compression::IdentityAdapter.expects(:new).returns(adapter) + + adapter.expects(:close) + @uncompressor.uncompress(@response) { |u| } + end + end + + describe "zlib adapter" do + before do + @inflater = stub_everything 'inflater' + Zlib::Inflate.stubs(:new).returns(@inflater) + @adapter = Puppet::Network::HTTP::Compression::Active::ZlibAdapter.new + end + + it "should initialize the underlying inflater with gzip/zlib header parsing" do + Zlib::Inflate.expects(:new).with(15+32) + Puppet::Network::HTTP::Compression::Active::ZlibAdapter.new + end + + it "should inflate the given chunk" do + @inflater.expects(:inflate).with("chunk") + @adapter.uncompress("chunk") + end + + it "should return the inflated chunk" do + @inflater.stubs(:inflate).with("chunk").returns("uncompressed") + @adapter.uncompress("chunk").should == "uncompressed" + end + + it "should try a 'regular' inflater on Zlib::DataError" do + @inflater.expects(:inflate).raises(Zlib::DataError.new "not a zlib stream") + inflater = stub_everything 'inflater2' + inflater.expects(:inflate).with("chunk").returns("uncompressed") + Zlib::Inflate.expects(:new).with().returns(inflater) + @adapter.uncompress("chunk") + end + + it "should raise the error the second time" do + @inflater.expects(:inflate).raises(Zlib::DataError.new "not a zlib stream") + Zlib::Inflate.expects(:new).with().returns(@inflater) + lambda { @adapter.uncompress("chunk") }.should raise_error + end + + it "should finish the stream on close" do + @inflater.expects(:finish) + @adapter.close + end + + it "should close the stream on close" do + @inflater.expects(:close) + @adapter.close + end + end + end +end diff --git a/spec/unit/type/file/content.rb b/spec/unit/type/file/content.rb index c78a35c..bb6ccb0 100755 --- a/spec/unit/type/file/content.rb +++ b/spec/unit/type/file/content.rb @@ -407,6 +407,56 @@ describe content do @sum.expects(:sum_stream).yields(@digest).returns("checksum") @content.write(@fh).should == "checksum" end + + it "should get the current accept encoding header value" do + @content.expects(:add_accept_encoding) + @content.write(@fh) + end + + it "should uncompress body on error" do + @response.expects(:code).returns("500") + @response.expects(:body).returns("compressed body") + @content.expects(:uncompress_body).with(@response).returns("uncompressed") + lambda { @content.write(@fh) }.should raise_error { |e| e.message =~ /uncompressed/ } + end + + it "should uncompress chunk by chunk" do + uncompressor = stub_everything 'uncompressor' + @content.expects(:uncompress).with(@response).yields(uncompressor) + @response.expects(:code).returns("200") + @response.expects(:read_body).multiple_yields("chunk1","chunk2") + + uncompressor.expects(:uncompress).with("chunk1").then.with("chunk2") + @content.write(@fh) + end + + it "should write uncompressed chunks to the file" do + uncompressor = stub_everything 'uncompressor' + @content.expects(:uncompress).with(@response).yields(uncompressor) + @response.expects(:code).returns("200") + @response.expects(:read_body).multiple_yields("chunk1","chunk2") + + uncompressor.expects(:uncompress).with("chunk1").returns("uncompressed1") + uncompressor.expects(:uncompress).with("chunk2").returns("uncompressed2") + + @fh.expects(:print).with("uncompressed1") + @fh.expects(:print).with("uncompressed2") + + @content.write(@fh) + end + + it "should pass each uncompressed chunk to the current sum stream" do + uncompressor = stub_everything 'uncompressor' + @content.expects(:uncompress).with(@response).yields(uncompressor) + @response.expects(:code).returns("200") + @response.expects(:read_body).multiple_yields("chunk1","chunk2") + + uncompressor.expects(:uncompress).with("chunk1").returns("uncompressed1") + uncompressor.expects(:uncompress).with("chunk2").returns("uncompressed2") + + @digest.expects(:<<).with("uncompressed1").then.with("uncompressed2") + @content.write(@fh) + end end end end -- 1.6.6.1 -- You received this message because you are subscribed to the Google Groups "Puppet Developers" group. To post to this group, send email to [email protected]. To unsubscribe from this group, send email to [email protected]. For more options, visit this group at http://groups.google.com/group/puppet-dev?hl=en.
