PROTON-1803: [ruby] Container support for scheduled tasks
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/02f49551 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/02f49551 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/02f49551 Branch: refs/heads/go1 Commit: 02f495510cb9f5e5308393fe8bb8e44df8ecebcd Parents: 1108c4e Author: Alan Conway <acon...@redhat.com> Authored: Thu Mar 22 16:01:05 2018 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Fri Mar 23 09:39:39 2018 -0400 ---------------------------------------------------------------------- proton-c/bindings/ruby/lib/core/container.rb | 115 ++++++++++++++++---- proton-c/bindings/ruby/lib/qpid_proton.rb | 1 + proton-c/bindings/ruby/lib/util/schedule.rb | 63 +++++++++++ proton-c/bindings/ruby/tests/test_container.rb | 40 ++++++- 4 files changed, 194 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/core/container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/container.rb b/proton-c/bindings/ruby/lib/core/container.rb index 2d920b4..c581d34 100644 --- a/proton-c/bindings/ruby/lib/core/container.rb +++ b/proton-c/bindings/ruby/lib/core/container.rb @@ -21,6 +21,8 @@ require 'set' require_relative 'listener' module Qpid::Proton + public + # An AMQP container manages a set of {Listener}s and {Connection}s which # contain {#Sender} and {#Receiver} links to transfer messages. Usually, each # AMQP client or server process has a single container for all of its @@ -29,6 +31,8 @@ module Qpid::Proton # One or more threads can call {#run}, events generated by all the listeners and # connections will be dispatched in the {#run} threads. class Container + include TimeCompare + # Error raised if the container is used after {#stop} has been called. class StoppedError < RuntimeError def initialize(*args) super("container has been stopped"); end @@ -62,16 +66,17 @@ module Qpid::Proton # Implementation note: # - # - #run threads take work from @work - # - Each driver and the Container itself is processed by at most one #run thread at a time - # - The Container thread does IO.select + # - #run threads take work items from @work, process them, and rearm them for select + # - work items are: ConnectionTask, ListenTask, :start, :select, :schedule # - nil on the @work queue makes a #run thread exit @work = Queue.new @work << :start - @work << self # Issue start and start start selecting + @work << :select @wake = SelectWaker.new # Wakes #run thread in IO.select @auto_stop = true # Stop when @active drops to 0 + @schedule = Schedule.new + @schedule_working = false # True if :schedule is on the work queue # Following instance variables protected by lock @lock = Mutex.new @@ -234,6 +239,17 @@ module Qpid::Proton @wake.wake end + # Schedule code to be executed after a delay. + # @param delay [Numeric] delay in seconds, must be >= 0 + # @yield [ ] the block is invoked with no parameters in a {#run} thread after +delay+ has elapsed + def schedule(delay, &block) + delay >= 0.0 or raise ArgumentError, "delay=#{delay} must be >= 0" + block_given? or raise ArgumentError, "no block given" + not_stopped + @lock.synchronize { @active += 1 } if @schedule.add(Time.now + delay, &block) + @wake.wake + end + private # Container driver applies options and adds container context to events @@ -288,6 +304,8 @@ module Qpid::Proton STDERR.puts "#{@log_prefix}#{([method[3..-1].upcase]+args).join ', '})" if @log_prefix @handler.__send__(method, self, *args) if @handler && @handler.respond_to?(method) end + + def next_tick() nil; end end # Selectable object that can be used to wake IO.select from another thread @@ -329,35 +347,44 @@ module Qpid::Proton when :start @adapter.on_container_start(self) if @adapter.respond_to? :on_container_start - when Container + when :select + # Compute read/write select sets and minimum next_tick for select timeout r, w = [@wake], [] - next_tick = nil + next_tick = @schedule.next_tick @lock.synchronize do @selectable.each do |s| r << s if s.send :can_read? w << s if s.send :can_write? - next_tick = next_tick_min(s, next_tick) + next_tick = earliest(s.next_tick, next_tick) end end + now = Time.now timeout = ((next_tick > now) ? next_tick - now : 0) if next_tick r, w = IO.select(r, w, nil, timeout) now = Time.now - selected = Set.new(r).delete(@wake) + @wake.reset if r && r.delete(@wake) + + # selected is a Set to eliminate duplicates between r, w and next_tick due. + selected = Set.new + selected.merge(r) if r selected.merge(w) if w - selected.merge(@selectable.select { |s| next_tick_due(s, now) }) - @wake.reset - stop_select = nil @lock.synchronize do - if stop_select = @stopped # close everything - selected += @selectable - selected.each { |s| s.close @stop_err } + if @stopped # close everything + @selectable.each { |s| s.close @stop_err; @work << s } + @selectable.clear @wake.close + return end - @selectable -= selected # Remove selected tasks + if !@schedule_working && before_eq(@schedule.next_tick, now) + @schedule_working = true + @work << :schedule + end + selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) }) + @selectable -= selected # Remove selected tasks from @selectable end selected.each { |s| @work << s } # Queue up tasks needing #process - @work << self unless stop_select + @work << :select # Enable next select when ConnectionTask then maybe_panic { task.process } @@ -367,17 +394,56 @@ module Qpid::Proton io, opts = maybe_panic { task.process } add(connection_driver(io, opts, true)) if io rearm task - end - end - def next_tick_due(x, now) - nt = x.respond_to?(:next_tick) && x.next_tick - nt && (nt <= now) + when :schedule then + if maybe_panic { @schedule.process Time.now } + @lock.synchronize { @active -= 1; check_stop_lh } + else + @lock.synchronize { @schedule_working = false } + end + end end - def next_tick_min(x, t) - nt = x.respond_to?(:next_tick) && x.next_tick - nt if !t || (nt < t) + def do_select + # Compute the sets to select for read and write, and the minimum next_tick for the timeout + r, w = [@wake], [] + next_tick = nil + @lock.synchronize do + @selectable.each do |s| + r << s if s.can_read? + w << s if s.can_write? + next_tick = earliest(s.next_tick, next_tick) + end + end + next_tick = earliest(@schedule.next_tick, next_tick) + + # Do the select and queue up all resulting work + now = Time.now + timeout = next_tick - now if next_tick + r, w = (timeout.nil? || timeout > 0) && IO.select(r, w, nil, timeout) + @wake.reset + selected = Set.new + @lock.synchronize do + if @stopped + @selectable.each { |s| s.close @stop_err; @work << s } + @wake.close + return + end + # Check if schedule has items due and is not already working + if !@schedule_working && before_eq(@schedule.next_tick, now) + @work << :schedule + @schedule_working = true + end + # Eliminate duplicates between r, w and next_tick due. + selected.merge(r) if r + selected.delete(@wake) + selected.merge(w) if w + @selectable -= selected + selected.merge(@selectable.select { |s| before_eq(s.next_tick, now) }) + @selectable -= selected + end + selected.each { |s| @work << s } # Queue up tasks needing #process + @work << :select end # Rescue any exception raised by the block and stop the container. @@ -386,6 +452,7 @@ module Qpid::Proton yield rescue Exception => e stop(nil, e) + nil end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/qpid_proton.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb index c52310b..21a15b9 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton.rb @@ -44,6 +44,7 @@ require "core/exceptions" require "util/deprecation" require "util/version" require "util/error_handler" +require "util/schedule" require "util/wrapper" # Types http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/lib/util/schedule.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/util/schedule.rb b/proton-c/bindings/ruby/lib/util/schedule.rb new file mode 100644 index 0000000..ef80c1b --- /dev/null +++ b/proton-c/bindings/ruby/lib/util/schedule.rb @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +module Qpid::Proton + + # @private + module TimeCompare + # t1 <= t2, where nil is treated as "distant future" + def before_eq(t1, t2) (t1 && t2) ? (t1 <= t2) : t1; end + + # min(t1, t2) where nil is treated as "distant future" + def earliest(t1, t2) before_eq(t1, t2) ? t1 : t2; end + end + + # @private + # A sorted, thread-safe list of scheduled Proc. + # Note: calls to #process are always serialized, but calls to #add may be concurrent. + class Schedule + include TimeCompare + Item = Struct.new(:time, :proc) + + def initialize() @lock = Mutex.new; @items = []; end + + def next_tick() @lock.synchronize { @items.empty? ? nil : @items.first.time } end + + # @return true if the Schedule was previously empty + def add(time, &proc) + @lock.synchronize do + if at = (0...@items.size).bsearch { |i| @items[i].time > time } + @items.insert(at, Item.new(time, proc)) + else + @items << Item.new(time, proc) + end + return @items.size == 1 + end + end + + # @return true if the Schedule became empty as a result of this call + def process(now) + due = [] + empty = @lock.synchronize do + due << @items.shift while !@items.empty? && before_eq(@items.first.time, now) + @items.empty? + end + due.each { |i| i.proc.call() } + return empty && !due.empty? + end + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/02f49551/proton-c/bindings/ruby/tests/test_container.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_container.rb b/proton-c/bindings/ruby/tests/test_container.rb index faa505a..c9f54cc 100644 --- a/proton-c/bindings/ruby/tests/test_container.rb +++ b/proton-c/bindings/ruby/tests/test_container.rb @@ -300,5 +300,43 @@ class ContainerTest < MiniTest::Test assert_raises(Container::StoppedError) { cont.run } assert_raises(Container::StoppedError) { cont.listen "" } end -end + # Make sure Container::Scheduler puts tasks in proper order. + def test_scheduler + a = [] + s = Schedule.new + + assert_equal true, s.add(Time.at 3) { a << 3 } + assert_equal false, s.process(Time.at 2) # Should not run + assert_equal [], a + assert_equal true, s.process(Time.at 3) # Should run + assert_equal [3], a + + a = [] + assert_equal true, s.add(Time.at 3) { a << 3 } + assert_equal false, s.add(Time.at 5) { a << 5 } + assert_equal false, s.add(Time.at 1) { a << 1 } + assert_equal false, s.add(Time.at 4) { a << 4 } + assert_equal false, s.add(Time.at 4) { a << 4.1 } + assert_equal false, s.add(Time.at 4) { a << 4.2 } + assert_equal false, s.process(Time.at 4) + assert_equal [1, 3, 4, 4.1, 4.2], a + a = [] + assert_equal true, s.process(Time.at 5) + assert_equal [5], a + end + + def test_container_schedule + c = Container.new __method__ + delays = [0.1, 0.03, 0.02, 0.04] + a = [] + delays.each { |d| c.schedule(d) { a << [d, Time.now] } } + start = Time.now + c.run + delays.sort.each do |d| + x = a.shift + assert_equal d, x[0] + assert_in_delta start + d, x[1], 0.01, "#{d}" + end + end +end --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org