On Wed, Jul 21, 2010 at 7:13 AM, I. E. Smith-Heisters <[email protected]> wrote:
>
> Ok, I finally got something working, but I'm afraid it will be
> brittle. I figured that using a service and notification events would
> be more flexible, produce code with looser coupling with Ruote, and
> provide a path toward better asynchronicity in the future. So, I
> removed the wait code from my participants and added it to a static
> module. I then created a service called "PatientNotifier" like this:
>
> # Wakes a thread that is stopped using `Patient::wait_for`. It will
> wake the
> # thread in the following
> circumstances:
> #
> # * when a "dispatch" event is received that has a workitem that
> matches the
> # criteria passed to
> `Patient::wait_for`
> # * when a "reply" event is received, and no further events are
> received for
> # up to a `listen_for`
> seconds.
> #
> # This latter circumstance is potentially problematic if some
> messages take
> # more than `listen_for` seconds to be processed, but it's the only
> way to
> # detect situations where a "dispatch" won't be sent until something
> else
> # happens, eg. when one branch of one or more concurrent branches
> has ended.
> ---------------------------------------------------------------------------------------------
> <
> # This is unavoidable given Ruote's design of never knowing what's
> going to
> # happen next.
> ---------------------------------------------------------------------------------------------
> <
Framing that one :-)
"embrace the asynchronous"
> class PatientNotifier <
> ApplicationNotifier
> class << self
> def subscribe_to; %w(reply dispatch);
> end
> def listen_for; 1;
> end
>
> end
>
> def initialize *args
> super *args
> �...@lock =
> Mutex.new
>
> end
>
> def dispatches; @lock.synchroni...@dispatches}; end
> def dispatches= new_d; @lock.synchroni...@dispatches = new_d};
> end
> def listen_thread; @lock.synchroni...@listen_thread}; end
> def listen_thread= new_d; @lock.synchroni...@listen_thread =
> new_d}; end
>
> def notify msg
> send "handle_#{msg["action"]}",
> msg
> end
>
> def handle_reply
> msg
> start_listening
> msg
> end
>
> def handle_dispatch msg
> self.dispatches << msg if dispatches
> continue
> msg
>
> end
>
> def continue
> msg
> wi =
> Ruote::Workitem.new(msg["workitem"])
> ::Patient.continue wi
> end
>
> def start_listening msg
> if listen_thread
>
> listen_thread.kill
>
> end
> self.listen_thread = Thread.new do
> listen
> msg
> end
> end
>
> def listen msg
> self.dispatches =
> []
> sleep self.class.listen_for
> if self.dispatches.empty?
> continue msg
> end
> ensure
> self.listen_thread, self.dispatches = nil, nil
> end
> end
Ouch, I have trouble re-formatting that each time. It looks painful.
> I then call this on my engine: `add_service 'patient_notifier',
> 'notifiers', 'Workflow::PatientNotifier'`
>
> The rest of the code is pretty similar to the code I posted before
> with some slight refactoring to remove it from Participants and put it
> into a singleton Patient module (sans #consume method, of course).
>
> I'm interested to hear your thoughts on this approach.
I read at some point that you'd like to know your position in the flow
in order to not sleep. Maybe this could help :
( gist at http://gist.github.com/483796 )
---8<---
require 'rubygems'
require 'ruote'
engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
pdef = Ruote.define do
concurrence do
alpha
sequence do
bravo
charlie
end
delta
end
end
class MyParticipant
include Ruote::LocalParticipant
def consume (workitem)
p [
workitem.participant_name,
:parent_type, parent_type(workitem),
:last?, last?(workitem)
]
reply(workitem)
end
# Returns the name of the parent expression (like 'sequence' or
# 'concurrence').
#
def parent_type (workitem)
fetch_flow_expression(workitem).parent.name
end
# Returns true if the participant expression with this workitem is the
# last in the list of children of the parent expression
# (warning, it doesn't mean that much when in a concurrence expression).
#
def last? (workitem)
fexp = fetch_flow_expression(workitem)
fexp.fei.child_id == fexp.parent.tree[2].length - 1
end
end
engine.register_participant '.+', MyParticipant
wfid = engine.launch(pdef)
engine.wait_for(wfid)
--->8---
I hope it helps.
--
John Mettraux - http://jmettraux.wordpress.com
--
you received this message because you are subscribed to the "ruote users" group.
to post : send email to [email protected]
to unsubscribe : send email to [email protected]
more options : http://groups.google.com/group/openwferu-users?hl=en