(publishing for the sake of documentation, I'm unlikely to push
 this into yahns.git)

Users may set NO_SLEEPY_PENGUIN in the environment to avoid loading
the sleepy_penguin extension library and use IO.select instead.
This can potentially allow us to run on OSes which support neither
kqueue nor epoll.

This current implementation is currently somewhat buggy and client
expiry tests sometimes fail.  Maybe it'll be fixed in the future.

This also helps document our expectation from one-shot-based
epoll/kqueue in terms any Rubyist who understands IO.select
should be able to understand.

We'll still declare an explicit dependency on sleepy_penguin since
scalability to idle clients suffers without epoll or kqueue.
---
 lib/yahns.rb                    |   6 +-
 lib/yahns/queue.rb              |  10 ++-
 lib/yahns/queue_quitter.rb      |   4 +-
 lib/yahns/queue_quitter_pipe.rb |   4 +
 lib/yahns/queue_select.rb       | 164 ++++++++++++++++++++++++++++++++++++++++
 lib/yahns/sigevent.rb           |   2 +-
 6 files changed, 182 insertions(+), 8 deletions(-)
 create mode 100644 lib/yahns/queue_select.rb

diff --git a/lib/yahns.rb b/lib/yahns.rb
index d48b34d..5b48d9f 100644
--- a/lib/yahns.rb
+++ b/lib/yahns.rb
@@ -3,7 +3,11 @@
 $stdout.sync = $stderr.sync = true
 
 require 'unicorn' # pulls in raindrops, kgio, fcntl, etc, stringio, and logger
-require 'sleepy_penguin'
+begin
+  raise LoadError, "NO_SLEEPY_PENGUIN" if ENV["NO_SLEEPY_PENGUIN"]
+  require 'sleepy_penguin'
+rescue LoadError
+end
 
 # kill off some unicorn internals we don't need
 # we'll probably just make kcar into a server parser so we don't depend
diff --git a/lib/yahns/queue.rb b/lib/yahns/queue.rb
index 2abdc27..3314e2b 100644
--- a/lib/yahns/queue.rb
+++ b/lib/yahns/queue.rb
@@ -1,7 +1,11 @@
 # Copyright (C) 2013-2014, all contributors
 # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
-if SleepyPenguin.const_defined?(:Epoll)
-  require_relative 'queue_epoll'
+if defined?(SleepyPenguin)
+  if SleepyPenguin.const_defined?(:Epoll)
+    require_relative 'queue_epoll'
+  else
+    require_relative 'queue_kqueue'
+  end
 else
-  require_relative 'queue_kqueue'
+  require_relative 'queue_select'
 end
diff --git a/lib/yahns/queue_quitter.rb b/lib/yahns/queue_quitter.rb
index b9cf28a..2e6857c 100644
--- a/lib/yahns/queue_quitter.rb
+++ b/lib/yahns/queue_quitter.rb
@@ -1,11 +1,9 @@
 # Copyright (C) 2013, Eric Wong <normalper...@yhbt.net> and all contributors
 # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
 
-require 'sleepy_penguin'
-
 # add this as a level-triggered to any thread pool stuck on epoll_wait
 # and watch it die!
-if SleepyPenguin.const_defined?(:EventFD)
+if defined?(SleepyPenguin::EventFD)
   class Yahns::QueueQuitter < Yahns::Sigevent # :nodoc:
     def yahns_step
       Thread.current.exit
diff --git a/lib/yahns/queue_quitter_pipe.rb b/lib/yahns/queue_quitter_pipe.rb
index e18e249..19cc154 100644
--- a/lib/yahns/queue_quitter_pipe.rb
+++ b/lib/yahns/queue_quitter_pipe.rb
@@ -21,4 +21,8 @@ class Yahns::QueueQuitter # :nodoc:
     @reader.close
     @to_io.close
   end
+
+  def closed?
+    @to_io.closed?
+  end
 end
diff --git a/lib/yahns/queue_select.rb b/lib/yahns/queue_select.rb
new file mode 100644
index 0000000..681ded4
--- /dev/null
+++ b/lib/yahns/queue_select.rb
@@ -0,0 +1,164 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2014, all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (see COPYING for details)
+require 'thread'
+
+# See lib/yahns/queue_epoll.rb or lib/yahns/queue_kqueue.rb for preferred
+# implementations, this can be much slower but does not depend on the
+# system having epoll or kqueue support.
+#
+# This class exists partially to document our expectations from a epoll
+# or kqueue-like notification systems for multi-thread-friendly behavior.
+# It is modeled after our understanding of fs/eventpoll.c and dependent
+# components (w.r.t one-shot notification) in the Linux kernel.
+
+# This requires Ruby 2.1 or later
+
+class Yahns::Queue # :nodoc:
+  attr_accessor :fdmap # Yahns::Fdmap
+
+  # public
+  QEV_QUIT = -1 # writer array inside the IO.select args array
+  QEV_RD = 0
+  QEV_WR = 1
+
+  def initialize
+    @r, @w = IO.pipe
+    @ready = Queue.new # analogous to the ready list in Linux epoll
+    @cond = ConditionVariable.new
+    @watch = {} # analogous to the rbtree in Linux epoll
+    @mtx = Mutex.new # protects @watch and @cond
+    @thr = nil # analogous to softirq processing threads
+
+    # noop, just to wakeup from IO.select
+    def @r.yahns_step
+      read_nonblock(1, "", exception: false)
+      :ignore
+    end
+  end
+
+  # for HTTP and HTTPS servers, we rely on the io writing to us, first
+  # flags: QEV_RD/QEV_WR (usually QEV_RD)
+  def queue_add(io, flags)
+    if flags == QEV_QUIT
+      # weird special case for the quitter:
+      Thread.list.each { @ready.push(io) }
+    else
+      # order is very important here, this thread cannot do anything with
+      # io once we've issued epoll_ctl() because another thread may use it
+      @fdmap.add(io)
+      queue_mod(io, flags)
+    end
+  end
+
+  def thr_init
+    Thread.current[:yahns_rbuf] = ""
+    Thread.current[:yahns_fdmap] = @fdmap
+  end
+
+  def do_watch(logger)
+    begin
+      tmp = [ [], [] ] # [ readers, writers ]
+      @mtx.synchronize do
+        @cond.broadcast
+        @watch[@r] = QEV_RD
+        @watch.delete_if do |io, idx|
+          if io.closed?
+            true
+          else
+            # idx is one of: -1, 0, 1
+            tmp[idx] << io
+            false
+          end
+        end
+      end
+
+      # FIXME: this should not need a timeout, we have a race somewhere
+      tmp = IO.select(*tmp, nil, 0.01) or next
+      # tmp = IO.select(*tmp) or next
+      @mtx.synchronize do
+        @cond.broadcast
+        tmp.each do |x|
+          x.each do |io|
+            # simulate EPOLLONESHOT behavior which yahns relies on
+            @watch.delete(io)
+            @ready.push(io)
+          end
+        end
+      end
+    rescue IOError, Errno::EBADF
+    rescue => e
+      Yahns::Log.exception(logger, 'select do_watch', e)
+    end while true
+  end
+
+  # use only before hijacking, once hijacked, io may be unusable to us
+  # It is not safe to call this unless it is an unarmed EPOLLONESHOT
+  # object.
+  def queue_del(io)
+    queue_del_sync(io)
+    @fdmap.forget(io)
+  end
+
+  # returns an infinitely running thread, max_events is ignored in this
+  def worker_thread(logger, max_events)
+    @thr ||= Thread.new { do_watch(logger) } # first thread starts watcher
+    Thread.new do
+      thr_init
+      begin
+        io = @ready.shift
+
+        # we cannot delete from the @ready Queue in O(1) time like
+        # the linked-list ready list in the Linux kernel, so we
+        # we must skip closed IOs here:
+        next if io.closed?
+
+        # Note: we absolutely must not do anything with io after
+        # we've called epoll_ctl on it, io is exclusive to this
+        # thread only until epoll_ctl is called on it.
+        case rv = io.yahns_step
+        when :wait_readable
+          queue_mod(io, QEV_RD)
+        when :wait_writable
+          queue_mod(io, QEV_WR)
+        when :ignore # only used by rack.hijack (and this internal pipe)
+          # we cannot unwatch after hijacking, the hijacker
+          # may have already closed it  Likewise, io.fileno is not
+          # expected to work, so we had to erase it from fdmap before hijack
+        when nil, :close
+          # this must be the ONLY place where we call IO#close on
+          # things that got inside the queue
+          queue_del_sync(io)
+          @fdmap.sync_close(io)
+          # n.b. io may still be in the @ready queue here
+        else
+          raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
+        end
+      rescue => e
+        Yahns::Log.exception(logger, 'queue loop', e)
+      end while true
+    end
+  end
+
+  def queue_mod(io, idx)
+    # similar to EPOLL_CTL_MOD (but combined with EPOLL_CTL_ADD)
+    @mtx.synchronize { @watch[io] = idx }
+    @w.write_nonblock('=', exception: false)
+  end
+
+  def queue_del_sync(io)
+    # we need to ensure the do_watch thread is not select-ing on io
+    # before returning from this method.  Closing a descriptor which
+    # is being select-ed on does not have well-defined semantics,
+    # so we want to avoid doing this with the ConditionVariable#wait
+    @mtx.synchronize do
+      @watch.delete(io)
+      @w.write(':')
+      @cond.wait(@mtx) if @thr.alive?
+    end
+  end
+
+  def close
+    # noop
+  end
+end
diff --git a/lib/yahns/sigevent.rb b/lib/yahns/sigevent.rb
index aa95f4b..d942e7e 100644
--- a/lib/yahns/sigevent.rb
+++ b/lib/yahns/sigevent.rb
@@ -1,6 +1,6 @@
 # Copyright (C) 2013, Eric Wong <normalper...@yhbt.net>
 # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
-if SleepyPenguin.const_defined?(:EventFD)
+if defined?(SleepyPenguin::EventFD)
   require_relative 'sigevent_efd'
 else
   require_relative 'sigevent_pipe'
-- 
EW


Reply via email to