Been playing with a "wakeup" participant to no avail. It suffers from
the same problem as #on_reply in that it resumes the waiting thread
before the next storage participant has received the workitem.

Polling doesn't work because I can't find a good interface for asking
the worker "do you have more work to do?". I had thought
Worker#inactive? would do this, but it will return false as long as
there are wfids in the DB.

-ISH

On Jul 19, 10:00 pm, "I. E. Smith-Heisters" <[email protected]> wrote:
> On Jul 19, 5:59 pm, John Mettraux <[email protected]> wrote:
>
>
>
>
>
> > On Mon, Jul 19, 2010 at 03:19:23PM -0700, I. E. Smith-Heisters wrote:
>
> > > I posted some code a while back that lets me pause my main thread
> > > while the Ruote worker thread does work. The Ruote worker wakes the
> > > main thread up when it's done. The point being that in certain very
> > > important places I can simulate synchronicity with Ruote so human
> > > users can be assured that they'll see the results of their (HTTP)
> > > requests.
>
> > > The code I posted (below) relied on subclassing StorageParticipant and
> > > overloading the #consume method. Now that we've added concurrency to
> > > our process definition this approach has an unforeseen (but now
> > > obvious) shortcoming. When we're in a concurrent branch, the
> > > completion of one branch does not send the workitem to another
> > > participant (since we're waiting on the other branch), and #consume is
> > > never called.
>
> > > I have a couple possible fixes up my sleeve: overriding
> > > Ruote::Exp::ParticipantExpression#reply to continue the waiting thread
> > > (hacky), or not waiting in the first place if we're in a concurrent
> > > branch (inconsistent behavior). However, I was wondering if someone
> > > might have some suggestions that are less error-prone and better
> > > supported by the existing interfaces (ie. no monkey-patching).
>
> > Hello,
>
> > maybe you could have a look at Participant#on_reply. It was recently 
> > enhanced to work with "instantiated participants" (participants 
> > instantiated at registration time (I have the impression that it's what you 
> > use)).
>
> Nope. I use `register_participant 'foo', Foo`.
>
>
>
> > The test case is :
>
> >  http://github.com/jmettraux/ruote/blob/ruote2.1/test/functional/ft_43...
>
> > Basically, it's a callback triggered when the workitem reaches back the 
> > participant.
>
> on_reply looks functionally identical to what I did by overloading
> StorageParticipant#reply in a subclass. It's nicer, so I can switch to
> that, but it still won't solve my problem (see below).
>
>
>
> > I'm not sure I fully understand your case. It seems that if there are 
> > multiple workitems in your engine, any of them will wake up the waiting 
> > thread when reaching a storage participant.
>
> *All* my code is currently run in response to user requests, and all
> my participants are storage participants. There's one user request per
> ruby process, so there's no chance of the waiting thread being woken
> by a workitem from another ruote process.
>
>
>
> > Or, some speculation : what about a participant that explicitely wakes up 
> > your thread ?
>
> hmmmm... this seems promising.
>
>
>
>
>
>
>
> > ---8<---
> > pdef = Ruote.process_definition do
> >   sequence do
> >     participant 'patient'
> >     do_the_work
> >     participant 'waker'
> >   end
> > end
>
> > module Bedroom
>
> >   @guests = {}
>
> >   def self.sleep (label)
> >     @guests[label] = Thread.current
> >     Thread.current.stop
> >   end
>
> >   def self.wakeup (label)
> >     t = @guests.delete(label)
> >     t.wakeup
> >   end
> > end
>
> > class PatientParticipant < Ruote::StorageParticipant
>
> >   def reply (workitem)
> >     label = workitem.fei.to_s
> >     workitem.fields['sleeper'] = label
> >     super(workitem)
> >     Bedroom.sleep(label)
> >   end
> > end
>
> > class WakerParticipant
> >   include Ruote::LocalParticipant
>
> >   def consume (workitem)
> >     Bedroom.wakeup(workitem.fields.delete('sleeper'))
> >     reply_to_engine(workitem)
> >   end
> > end
> > --->8---
>
> > (This assumes that the worker is running in the same ruby process as the 
> > web application).
>
> > When the patient participant replies to the engine, the replying thread is 
> > put to sleep. It's woken up a bit later by a specific 'waker' participant.
>
> > You could also rely on something inside "do_the_work" to wake up the thread.
>
> > Sorry if I'm completely off.
>
> Not at all, this is exactly the completely new perspective I need.
> Sorry I was unclear--I was too in the middle of the code to explain it
> well. Let me try again.
>
> Let's take this example pdef, where all participants are storage
> participants:
>
>   sequence do
>     concurrence do
>       alpha
>       cursor do
>         bravo
>         charlie
>       end
>     end
>     delta
>   end
>
> Now, simplified my user-request code might look like this:
>
>   # user POST /<wfid>/alpha
>   def process_user_request wfid, pname
>     wi = Workitems.by_wfid_and_pname(wfid, pname) # I've ensured that
> there can't be two workitems for 'alpha' on the same wfid
>     Participants::Patient.wait_for do
>       Workitems.reply wi
>     end
>   end
>
> The ::wait_for code, as seen in my first post, basically puts the
> current thread to sleep, and causes calls to
> StorageParticipant#consume to wake it up. So, the following test
> works:
>
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['alpha', 'bravo']
>   process_user_request '<wfid>', 'bravo'
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['alpha', 'charlie']
>
> So, the wait_for ensures not only that the #reply has been processed,
> but that the next participant has been reached (ie. #consume has been
> called).
>
> Now, take the failing test:
>
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['alpha', 'bravo']
>   process_user_request '<wfid>', 'alpha' # difference: replying to the
> alpha participant
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['bravo'] # FAIL!
>
> This fails--the ruby process hangs on the call to
> #process_user_request. This is because it's waiting for #consume, but
> bravo's #consume has already been called!
>
> Ok, we can add #on_reply to the mix:
>
>   class PatientParticipant
>     def on_reply wi
>       self.class.continue if
> fetch_flow_expression(wi).parent.class.to_s =~ /Concurrence/i
>     end
>   end
>
> Ok, so this makes the test above pass even though it's obviously dumb.
> It generally relies on #consume, but in the case of #alpha, will
> continue the sleeping thread. So even this longer test works:
>
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['alpha', 'bravo']
>   process_user_request '<wfid>', 'alpha'
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['bravo']
>   process_user_request '<wfid>', 'bravo'
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['charlie']
>   process_user_request '<wfid>', 'charlie'
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['delta']
>
> BUUUT, if you go the other way by replying to bravo first, it fails of
> course:
>
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['alpha', 'bravo']
>   process_user_request '<wfid>', 'bravo'
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['alpha', 'charlie']
>   process_user_request '<wfid>', 'charlie' # BONK!
>   Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
> ['alpha']
>
> It stalls forever. delta won't receive a dispatch until alpha is done,
> alpha has already received a dispatch and won't continue the thread,
> and I have no way to tell that charlie was the last node in a
> concurrent branch.
>
> Clearer? TMI?
>
> #on_reply won't work in the general case, when there's sequential
> execution, because it will wake the sleeping thread before the next
> participant has received their workitem. So, maybe a WakeupParticipant
> would work (I like its simplicity)--but it will double the size of all
> our pdef code (oh well). Another option that's looking increasingly
> attractive is just to poll the worker to see if it's done. The ideal
> solution would be to detect when you're at the end of a branch, but
> that there are other branches pausing execution. The
> `concurrence :count => x` feature, though, could make that even more
> difficult...
>
> Sorry for writing the Great American Novel. Thanks for your advice and
> help.
>
> -Ian

-- 
you received this message because you are subscribed to the "ruote users" group.
to post : send email to [email protected]
to unsubscribe : send email to [email protected]
more options : http://groups.google.com/group/openwferu-users?hl=en

Reply via email to