Hi all,
I posted some code a while back that lets me pause my main thread
while the Ruote worker thread does work. The Ruote worker wakes the
main thread up when it's done. The point being that in certain very
important places I can simulate synchronicity with Ruote so human
users can be assured that they'll see the results of their (HTTP)
requests.
The code I posted (below) relied on subclassing StorageParticipant and
overloading the #consume method. Now that we've added concurrency to
our process definition this approach has an unforeseen (but now
obvious) shortcoming. When we're in a concurrent branch, the
completion of one branch does not send the workitem to another
participant (since we're waiting on the other branch), and #consume is
never called.
I have a couple possible fixes up my sleeve: overriding
Ruote::Exp::ParticipantExpression#reply to continue the waiting thread
(hacky), or not waiting in the first place if we're in a concurrent
branch (inconsistent behavior). However, I was wondering if someone
might have some suggestions that are less error-prone and better
supported by the existing interfaces (ie. no monkey-patching).
Thanks,
Ian
--
I've edited this some to omit extraneous details, so it's untested as
shown:
module Participants
class Patient < Ruote::StorageParticipant
class NestedWaitForError < StandardError; end
cattr_accessor :waiting
class << self
# run in the server thread
# => timeout worker response after a few seconds so users don't
see
# hanging when an error happens
def wait_for signal=:next, &block
raise NestedWaitForError, "nesting wait_for is not currently
supported" if self.waiting
begin
self.waiting = [Thread.current, signal]
caller_result = yield
if waiting
begin
Timeout::timeout(timeout){ Thread.stop }
rescue Timeout::Error # worker thread didn't call +continue
+ before the timeout expired
Rails.logger.info 'Timeout while waiting for worker'
Thread.current['__patience_result__'] = :timeout
end
end
Thread.current['__patience_result__']
ensure
self.waiting =
nil
end
end
# run in the worker
thread
def continue
workitem
return unless
waiting
thread, waiting_for =
*waiting
signals = [:next,
workitem.participant_name]
if signals.include?
waiting_for
continue!
workitem
end
end
def continue!
result
return unless
waiting
thread =
waiting.first
thread['__patience_result__'] =
result
self.waiting =
nil
thread.wakeup
end
# seconds to wait for worker to consume the
workitem
def
timeout
10
end
end
def consume workitem
super workitem
self.class.continue workitem.dup
end
end
end
--
you received this message because you are subscribed to the "ruote users" group.
to post : send email to [email protected]
to unsubscribe : send email to [email protected]
more options : http://groups.google.com/group/openwferu-users?hl=en