This should make it easier to track state for asynchronous proxy_pass buffering. --- RFC because of the emphasis on "should"...
lib/yahns/http_response.rb | 4 ++-- lib/yahns/wbuf.rb | 12 +++++++----- test/test_wbuf.rb | 7 ++++--- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb index 0b0296f..1ef2bbf 100644 --- a/lib/yahns/http_response.rb +++ b/lib/yahns/http_response.rb @@ -67,7 +67,7 @@ module Yahns::HttpResponse # :nodoc: alive = Yahns::StreamFile.new(body, alive, offset, count) body = nil end - wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir) + wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, ret) rv = wbuf.wbuf_write(self, header) body.each { |chunk| rv = wbuf.wbuf_write(self, chunk) } if body wbuf_maybe(wbuf, rv) @@ -199,7 +199,7 @@ module Yahns::HttpResponse # :nodoc: chunk = rv # hope the skb grows when we loop into the trywrite when :wait_writable, :wait_readable if k.output_buffering - wbuf = Yahns::Wbuf.new(body, alive, k.output_buffer_tmpdir) + wbuf = Yahns::Wbuf.new(body, alive, k.output_buffer_tmpdir, rv) rv = wbuf.wbuf_write(self, chunk) break else diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb index 4eed2c5..21dbb08 100644 --- a/lib/yahns/wbuf.rb +++ b/lib/yahns/wbuf.rb @@ -29,14 +29,15 @@ require_relative 'wbuf_common' # to be a scalability issue. class Yahns::Wbuf # :nodoc: include Yahns::WbufCommon + attr_reader :busy - def initialize(body, persist, tmpdir) + def initialize(body, persist, tmpdir, busy) @tmpio = nil @tmpdir = tmpdir @sf_offset = @sf_count = 0 @wbuf_persist = persist # whether or not we keep the connection alive @body = body - @bypass = false + @busy = busy end def wbuf_write(client, buf) @@ -47,8 +48,8 @@ class Yahns::Wbuf # :nodoc: when nil return # yay! hopefully we don't have to buffer again when :wait_writable, :wait_readable - @bypass = false # ugh, continue to buffering to file - end while @bypass + @busy = rv + end until @busy @tmpio ||= Yahns::TmpIO.new(@tmpdir) @sf_count += @tmpio.write(buf) @@ -57,6 +58,7 @@ class Yahns::Wbuf # :nodoc: @sf_count -= rv @sf_offset += rv when :wait_writable, :wait_readable + @busy = rv return rv else raise "BUG: #{rv.nil ? "EOF" : rv.inspect} on tmpio " \ @@ -67,7 +69,7 @@ class Yahns::Wbuf # :nodoc: # to disk if we can help it. @tmpio = @tmpio.close @sf_offset = 0 - @bypass = true + @busy = false nil end diff --git a/test/test_wbuf.rb b/test/test_wbuf.rb index bfdfa2b..e6f8790 100644 --- a/test/test_wbuf.rb +++ b/test/test_wbuf.rb @@ -19,7 +19,8 @@ class TestWbuf < Testcase buf = "*" * (16384 * 2) nr = 1000 [ true, false ].each do |persist| - wbuf = Yahns::Wbuf.new([], persist, Dir.tmpdir) + wbuf = Yahns::Wbuf.new([], persist, Dir.tmpdir, :wait_writable) + assert_equal :wait_writable, wbuf.busy a, b = socketpair assert_nil wbuf.wbuf_write(a, "HIHI") assert_equal "HIHI", b.read(4) @@ -69,7 +70,7 @@ class TestWbuf < Testcase break end while true end - wbuf = Yahns::Wbuf.new([], true, Dir.tmpdir) + wbuf = Yahns::Wbuf.new([], true, Dir.tmpdir, :wait_writable) assert_equal :wait_writable, wbuf.wbuf_write(a, buf) assert_equal :wait_writable, wbuf.wbuf_flush(a) @@ -93,7 +94,7 @@ class TestWbuf < Testcase def test_wbuf_flush_close pipe = cloexec_pipe persist = true - wbuf = Yahns::Wbuf.new(pipe[0], persist, Dir.tmpdir) + wbuf = Yahns::Wbuf.new(pipe[0], persist, Dir.tmpdir, :wait_writable) refute wbuf.respond_to?(:close) # we don't want this for HttpResponse body sp = socketpair rv = nil -- EW