On Jul 19, 5:59 pm, John Mettraux <[email protected]> wrote: > On Mon, Jul 19, 2010 at 03:19:23PM -0700, I. E. Smith-Heisters wrote: > > > I posted some code a while back that lets me pause my main thread > > while the Ruote worker thread does work. The Ruote worker wakes the > > main thread up when it's done. The point being that in certain very > > important places I can simulate synchronicity with Ruote so human > > users can be assured that they'll see the results of their (HTTP) > > requests. > > > The code I posted (below) relied on subclassing StorageParticipant and > > overloading the #consume method. Now that we've added concurrency to > > our process definition this approach has an unforeseen (but now > > obvious) shortcoming. When we're in a concurrent branch, the > > completion of one branch does not send the workitem to another > > participant (since we're waiting on the other branch), and #consume is > > never called. > > > I have a couple possible fixes up my sleeve: overriding > > Ruote::Exp::ParticipantExpression#reply to continue the waiting thread > > (hacky), or not waiting in the first place if we're in a concurrent > > branch (inconsistent behavior). However, I was wondering if someone > > might have some suggestions that are less error-prone and better > > supported by the existing interfaces (ie. no monkey-patching). > > Hello, > > maybe you could have a look at Participant#on_reply. It was recently enhanced > to work with "instantiated participants" (participants instantiated at > registration time (I have the impression that it's what you use)).
Nope. I use `register_participant 'foo', Foo`. > > The test case is : > > http://github.com/jmettraux/ruote/blob/ruote2.1/test/functional/ft_43... > > Basically, it's a callback triggered when the workitem reaches back the > participant. on_reply looks functionally identical to what I did by overloading StorageParticipant#reply in a subclass. It's nicer, so I can switch to that, but it still won't solve my problem (see below). > > I'm not sure I fully understand your case. It seems that if there are > multiple workitems in your engine, any of them will wake up the waiting > thread when reaching a storage participant. *All* my code is currently run in response to user requests, and all my participants are storage participants. There's one user request per ruby process, so there's no chance of the waiting thread being woken by a workitem from another ruote process. > > Or, some speculation : what about a participant that explicitely wakes up > your thread ? hmmmm... this seems promising. > > ---8<--- > pdef = Ruote.process_definition do > sequence do > participant 'patient' > do_the_work > participant 'waker' > end > end > > module Bedroom > > @guests = {} > > def self.sleep (label) > @guests[label] = Thread.current > Thread.current.stop > end > > def self.wakeup (label) > t = @guests.delete(label) > t.wakeup > end > end > > class PatientParticipant < Ruote::StorageParticipant > > def reply (workitem) > label = workitem.fei.to_s > workitem.fields['sleeper'] = label > super(workitem) > Bedroom.sleep(label) > end > end > > class WakerParticipant > include Ruote::LocalParticipant > > def consume (workitem) > Bedroom.wakeup(workitem.fields.delete('sleeper')) > reply_to_engine(workitem) > end > end > --->8--- > > (This assumes that the worker is running in the same ruby process as the web > application). > > When the patient participant replies to the engine, the replying thread is > put to sleep. It's woken up a bit later by a specific 'waker' participant. > > You could also rely on something inside "do_the_work" to wake up the thread. > > Sorry if I'm completely off. Not at all, this is exactly the completely new perspective I need. Sorry I was unclear--I was too in the middle of the code to explain it well. Let me try again. Let's take this example pdef, where all participants are storage participants: sequence do concurrence do alpha cursor do bravo charlie end end delta end Now, simplified my user-request code might look like this: # user POST /<wfid>/alpha def process_user_request wfid, pname wi = Workitems.by_wfid_and_pname(wfid, pname) # I've ensured that there can't be two workitems for 'alpha' on the same wfid Participants::Patient.wait_for do Workitems.reply wi end end The ::wait_for code, as seen in my first post, basically puts the current thread to sleep, and causes calls to StorageParticipant#consume to wake it up. So, the following test works: Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['alpha', 'bravo'] process_user_request '<wfid>', 'bravo' Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['alpha', 'charlie'] So, the wait_for ensures not only that the #reply has been processed, but that the next participant has been reached (ie. #consume has been called). Now, take the failing test: Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['alpha', 'bravo'] process_user_request '<wfid>', 'alpha' # difference: replying to the alpha participant Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['bravo'] # FAIL! This fails--the ruby process hangs on the call to #process_user_request. This is because it's waiting for #consume, but bravo's #consume has already been called! Ok, we can add #on_reply to the mix: class PatientParticipant def on_reply wi self.class.continue if fetch_flow_expression(wi).parent.class.to_s =~ /Concurrence/i end end Ok, so this makes the test above pass even though it's obviously dumb. It generally relies on #consume, but in the case of #alpha, will continue the sleeping thread. So even this longer test works: Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['alpha', 'bravo'] process_user_request '<wfid>', 'alpha' Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['bravo'] process_user_request '<wfid>', 'bravo' Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['charlie'] process_user_request '<wfid>', 'charlie' Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['delta'] BUUUT, if you go the other way by replying to bravo first, it fails of course: Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['alpha', 'bravo'] process_user_request '<wfid>', 'bravo' Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['alpha', 'charlie'] process_user_request '<wfid>', 'charlie' # BONK! Workitems.by_wfid('<wfid>').map(&:participant_name).should == ['alpha'] It stalls forever. delta won't receive a dispatch until alpha is done, alpha has already received a dispatch and won't continue the thread, and I have no way to tell that charlie was the last node in a concurrent branch. Clearer? TMI? #on_reply won't work in the general case, when there's sequential execution, because it will wake the sleeping thread before the next participant has received their workitem. So, maybe a WakeupParticipant would work (I like its simplicity)--but it will double the size of all our pdef code (oh well). Another option that's looking increasingly attractive is just to poll the worker to see if it's done. The ideal solution would be to detect when you're at the end of a branch, but that there are other branches pausing execution. The `concurrence :count => x` feature, though, could make that even more difficult... Sorry for writing the Great American Novel. Thanks for your advice and help. -Ian -- you received this message because you are subscribed to the "ruote users" group. to post : send email to [email protected] to unsubscribe : send email to [email protected] more options : http://groups.google.com/group/openwferu-users?hl=en
