Been playing with a "wakeup" participant to no avail. It suffers from the same problem as #on_reply in that it resumes the waiting thread before the next storage participant has received the workitem.
Polling doesn't work because I can't find a good interface for asking the worker "do you have more work to do?". I had thought Worker#inactive? would do this, but it will return false as long as there are wfids in the DB. -ISH On Jul 19, 10:00 pm, "I. E. Smith-Heisters" <[email protected]> wrote: > On Jul 19, 5:59 pm, John Mettraux <[email protected]> wrote: > > > > > > > On Mon, Jul 19, 2010 at 03:19:23PM -0700, I. E. Smith-Heisters wrote: > > > > I posted some code a while back that lets me pause my main thread > > > while the Ruote worker thread does work. The Ruote worker wakes the > > > main thread up when it's done. The point being that in certain very > > > important places I can simulate synchronicity with Ruote so human > > > users can be assured that they'll see the results of their (HTTP) > > > requests. > > > > The code I posted (below) relied on subclassing StorageParticipant and > > > overloading the #consume method. Now that we've added concurrency to > > > our process definition this approach has an unforeseen (but now > > > obvious) shortcoming. When we're in a concurrent branch, the > > > completion of one branch does not send the workitem to another > > > participant (since we're waiting on the other branch), and #consume is > > > never called. > > > > I have a couple possible fixes up my sleeve: overriding > > > Ruote::Exp::ParticipantExpression#reply to continue the waiting thread > > > (hacky), or not waiting in the first place if we're in a concurrent > > > branch (inconsistent behavior). However, I was wondering if someone > > > might have some suggestions that are less error-prone and better > > > supported by the existing interfaces (ie. no monkey-patching). > > > Hello, > > > maybe you could have a look at Participant#on_reply. It was recently > > enhanced to work with "instantiated participants" (participants > > instantiated at registration time (I have the impression that it's what you > > use)). > > Nope. I use `register_participant 'foo', Foo`. > > > > > The test case is : > > > http://github.com/jmettraux/ruote/blob/ruote2.1/test/functional/ft_43... > > > Basically, it's a callback triggered when the workitem reaches back the > > participant. > > on_reply looks functionally identical to what I did by overloading > StorageParticipant#reply in a subclass. It's nicer, so I can switch to > that, but it still won't solve my problem (see below). > > > > > I'm not sure I fully understand your case. It seems that if there are > > multiple workitems in your engine, any of them will wake up the waiting > > thread when reaching a storage participant. > > *All* my code is currently run in response to user requests, and all > my participants are storage participants. There's one user request per > ruby process, so there's no chance of the waiting thread being woken > by a workitem from another ruote process. > > > > > Or, some speculation : what about a participant that explicitely wakes up > > your thread ? > > hmmmm... this seems promising. > > > > > > > > > ---8<--- > > pdef = Ruote.process_definition do > > sequence do > > participant 'patient' > > do_the_work > > participant 'waker' > > end > > end > > > module Bedroom > > > @guests = {} > > > def self.sleep (label) > > @guests[label] = Thread.current > > Thread.current.stop > > end > > > def self.wakeup (label) > > t = @guests.delete(label) > > t.wakeup > > end > > end > > > class PatientParticipant < Ruote::StorageParticipant > > > def reply (workitem) > > label = workitem.fei.to_s > > workitem.fields['sleeper'] = label > > super(workitem) > > Bedroom.sleep(label) > > end > > end > > > class WakerParticipant > > include Ruote::LocalParticipant > > > def consume (workitem) > > Bedroom.wakeup(workitem.fields.delete('sleeper')) > > reply_to_engine(workitem) > > end > > end > > --->8--- > > > (This assumes that the worker is running in the same ruby process as the > > web application). > > > When the patient participant replies to the engine, the replying thread is > > put to sleep. It's woken up a bit later by a specific 'waker' participant. > > > You could also rely on something inside "do_the_work" to wake up the thread. > > > Sorry if I'm completely off. > > Not at all, this is exactly the completely new perspective I need. > Sorry I was unclear--I was too in the middle of the code to explain it > well. Let me try again. > > Let's take this example pdef, where all participants are storage > participants: > > sequence do > concurrence do > alpha > cursor do > bravo > charlie > end > end > delta > end > > Now, simplified my user-request code might look like this: > > # user POST /<wfid>/alpha > def process_user_request wfid, pname > wi = Workitems.by_wfid_and_pname(wfid, pname) # I've ensured that > there can't be two workitems for 'alpha' on the same wfid > Participants::Patient.wait_for do > Workitems.reply wi > end > end > > The ::wait_for code, as seen in my first post, basically puts the > current thread to sleep, and causes calls to > StorageParticipant#consume to wake it up. So, the following test > works: > > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['alpha', 'bravo'] > process_user_request '<wfid>', 'bravo' > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['alpha', 'charlie'] > > So, the wait_for ensures not only that the #reply has been processed, > but that the next participant has been reached (ie. #consume has been > called). > > Now, take the failing test: > > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['alpha', 'bravo'] > process_user_request '<wfid>', 'alpha' # difference: replying to the > alpha participant > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['bravo'] # FAIL! > > This fails--the ruby process hangs on the call to > #process_user_request. This is because it's waiting for #consume, but > bravo's #consume has already been called! > > Ok, we can add #on_reply to the mix: > > class PatientParticipant > def on_reply wi > self.class.continue if > fetch_flow_expression(wi).parent.class.to_s =~ /Concurrence/i > end > end > > Ok, so this makes the test above pass even though it's obviously dumb. > It generally relies on #consume, but in the case of #alpha, will > continue the sleeping thread. So even this longer test works: > > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['alpha', 'bravo'] > process_user_request '<wfid>', 'alpha' > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['bravo'] > process_user_request '<wfid>', 'bravo' > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['charlie'] > process_user_request '<wfid>', 'charlie' > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['delta'] > > BUUUT, if you go the other way by replying to bravo first, it fails of > course: > > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['alpha', 'bravo'] > process_user_request '<wfid>', 'bravo' > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['alpha', 'charlie'] > process_user_request '<wfid>', 'charlie' # BONK! > Workitems.by_wfid('<wfid>').map(&:participant_name).should == > ['alpha'] > > It stalls forever. delta won't receive a dispatch until alpha is done, > alpha has already received a dispatch and won't continue the thread, > and I have no way to tell that charlie was the last node in a > concurrent branch. > > Clearer? TMI? > > #on_reply won't work in the general case, when there's sequential > execution, because it will wake the sleeping thread before the next > participant has received their workitem. So, maybe a WakeupParticipant > would work (I like its simplicity)--but it will double the size of all > our pdef code (oh well). Another option that's looking increasingly > attractive is just to poll the worker to see if it's done. The ideal > solution would be to detect when you're at the end of a branch, but > that there are other branches pausing execution. The > `concurrence :count => x` feature, though, could make that even more > difficult... > > Sorry for writing the Great American Novel. Thanks for your advice and > help. > > -Ian -- you received this message because you are subscribed to the "ruote users" group. to post : send email to [email protected] to unsubscribe : send email to [email protected] more options : http://groups.google.com/group/openwferu-users?hl=en
