Hi all,

I posted some code a while back that lets me pause my main thread
while the Ruote worker thread does work. The Ruote worker wakes the
main thread up when it's done. The point being that in certain very
important places I can simulate synchronicity with Ruote so human
users can be assured that they'll see the results of their (HTTP)
requests.

The code I posted (below) relied on subclassing StorageParticipant and
overloading the #consume method. Now that we've added concurrency to
our process definition this approach has an unforeseen (but now
obvious) shortcoming. When we're in a concurrent branch, the
completion of one branch does not send the workitem to another
participant (since we're waiting on the other branch), and #consume is
never called.

I have a couple possible fixes up my sleeve: overriding
Ruote::Exp::ParticipantExpression#reply to continue the waiting thread
(hacky), or not waiting in the first place if we're in a concurrent
branch (inconsistent behavior). However, I was wondering if someone
might have some suggestions that are less error-prone and better
supported by the existing interfaces (ie. no monkey-patching).

Thanks,
Ian

--
I've edited this some to omit extraneous details, so it's untested as
shown:


module Participants
  class Patient < Ruote::StorageParticipant
    class NestedWaitForError < StandardError; end
    cattr_accessor :waiting
    class << self
      # run in the server thread
      # => timeout worker response after a few seconds so users don't
see
      # hanging when an error happens
      def wait_for signal=:next, &block
        raise NestedWaitForError, "nesting wait_for is not currently
supported" if self.waiting
        begin
          self.waiting = [Thread.current, signal]
          caller_result = yield
          if waiting
            begin
              Timeout::timeout(timeout){ Thread.stop }
            rescue Timeout::Error # worker thread didn't call +continue
+ before the timeout expired
              Rails.logger.info 'Timeout while waiting for worker'
              Thread.current['__patience_result__'] = :timeout
            end
          end
          Thread.current['__patience_result__']
 
ensure
          self.waiting =
nil
 
end
 
end

      # run in the worker
thread
      def continue
workitem
        return unless
waiting
        thread, waiting_for =
*waiting
        signals = [:next,
workitem.participant_name]
        if signals.include?
waiting_for
          continue!
workitem
 
end
 
end

      def continue!
result
        return unless
waiting
        thread =
waiting.first
        thread['__patience_result__'] =
result
        self.waiting =
nil
 
thread.wakeup
 
end

      # seconds to wait for worker to consume the
workitem
      def
timeout
 
10
 
end
 
end

    def consume workitem
      super workitem
      self.class.continue workitem.dup
    end
  end
end

-- 
you received this message because you are subscribed to the "ruote users" group.
to post : send email to [email protected]
to unsubscribe : send email to [email protected]
more options : http://groups.google.com/group/openwferu-users?hl=en

Reply via email to