Sorry Google Groups was munging my code formatting. Here's my refactored half-solution nicely formatted: http://gist.github.com/486273
I say half-solution because it may need to be iterated on a little, since it appears to be unstable when the `listen` functionality is triggered. The `listen` is the only way I could get it to resume at the end of a concurrent branch. The idea with this approach is that you can swap out Patient for something like Comet and push events directly to a user over HTTP. Having the HTTP server thread do the waiting is just a cheap substitute for serving the user a page that waits for Ruote events. -ISH On Jul 20, 5:00 pm, John Mettraux <[email protected]> wrote: > On Wed, Jul 21, 2010 at 7:13 AM, I. E. Smith-Heisters <[email protected]> wrote: > > > > > > > > > Ok, I finally got something working, but I'm afraid it will be > > brittle. I figured that using a service and notification events would > > be more flexible, produce code with looser coupling with Ruote, and > > provide a path toward better asynchronicity in the future. So, I > > removed the wait code from my participants and added it to a static > > module. I then created a service called "PatientNotifier" like this: > > > # Wakes a thread that is stopped using `Patient::wait_for`. It will > > wake the > > # thread in the following > > circumstances: > > # > > # * when a "dispatch" event is received that has a workitem that > > matches the > > # criteria passed to > > `Patient::wait_for` > > # * when a "reply" event is received, and no further events are > > received for > > # up to a `listen_for` > > seconds. > > # > > # This latter circumstance is potentially problematic if some > > messages take > > # more than `listen_for` seconds to be processed, but it's the only > > way to > > # detect situations where a "dispatch" won't be sent until something > > else > > # happens, eg. when one branch of one or more concurrent branches > > has ended. > > --------------------------------------------------------------------------- > > ------------------ < > > # This is unavoidable given Ruote's design of never knowing what's > > going to > > # happen next. > > --------------------------------------------------------------------------- > > ------------------ < > > Framing that one :-) > > "embrace the asynchronous" > > > > > > > class PatientNotifier < > > ApplicationNotifier > > class << self > > def subscribe_to; %w(reply dispatch); > > end > > def listen_for; 1; > > end > > > end > > > def initialize *args > > super *args > > �...@lock = > > Mutex.new > > > end > > > def dispatches; @lock.synchroni...@dispatches}; end > > def dispatches= new_d; @lock.synchroni...@dispatches = new_d}; > > end > > def listen_thread; @lock.synchroni...@listen_thread}; end > > def listen_thread= new_d; @lock.synchroni...@listen_thread = > > new_d}; end > > > def notify msg > > send "handle_#{msg["action"]}", > > msg > > end > > > def handle_reply > > msg > > start_listening > > msg > > end > > > def handle_dispatch msg > > self.dispatches << msg if dispatches > > continue > > msg > > > end > > > def continue > > msg > > wi = > > Ruote::Workitem.new(msg["workitem"]) > > ::Patient.continue wi > > end > > > def start_listening msg > > if listen_thread > > > listen_thread.kill > > > end > > self.listen_thread = Thread.new do > > listen > > msg > > end > > end > > > def listen msg > > self.dispatches = > > [] > > sleep self.class.listen_for > > if self.dispatches.empty? > > continue msg > > end > > ensure > > self.listen_thread, self.dispatches = nil, nil > > end > > end > > Ouch, I have trouble re-formatting that each time. It looks painful. > > > I then call this on my engine: `add_service 'patient_notifier', > > 'notifiers', 'Workflow::PatientNotifier'` > > > The rest of the code is pretty similar to the code I posted before > > with some slight refactoring to remove it from Participants and put it > > into a singleton Patient module (sans #consume method, of course). > > > I'm interested to hear your thoughts on this approach. > > I read at some point that you'd like to know your position in the flow > in order to not sleep. Maybe this could help : > > ( gist athttp://gist.github.com/483796) > > ---8<--- > require 'rubygems' > require 'ruote' > > engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new)) > > pdef = Ruote.define do > concurrence do > alpha > sequence do > bravo > charlie > end > delta > end > end > > class MyParticipant > include Ruote::LocalParticipant > > def consume (workitem) > p [ > workitem.participant_name, > :parent_type, parent_type(workitem), > :last?, last?(workitem) > ] > reply(workitem) > end > > # Returns the name of the parent expression (like 'sequence' or > # 'concurrence'). > # > def parent_type (workitem) > fetch_flow_expression(workitem).parent.name > end > > # Returns true if the participant expression with this workitem is the > # last in the list of children of the parent expression > # (warning, it doesn't mean that much when in a concurrence expression). > # > def last? (workitem) > fexp = fetch_flow_expression(workitem) > fexp.fei.child_id == fexp.parent.tree[2].length - 1 > end > end > > engine.register_participant '.+', MyParticipant > > wfid = engine.launch(pdef) > > engine.wait_for(wfid) > --->8--- > > I hope it helps. > > -- > John Mettraux - http://jmettraux.wordpress.com -- you received this message because you are subscribed to the "ruote users" group. to post : send email to [email protected] to unsubscribe : send email to [email protected] more options : http://groups.google.com/group/openwferu-users?hl=en
