On Jul 19, 5:59 pm, John Mettraux <[email protected]> wrote:
> On Mon, Jul 19, 2010 at 03:19:23PM -0700, I. E. Smith-Heisters wrote:
>
> > I posted some code a while back that lets me pause my main thread
> > while the Ruote worker thread does work. The Ruote worker wakes the
> > main thread up when it's done. The point being that in certain very
> > important places I can simulate synchronicity with Ruote so human
> > users can be assured that they'll see the results of their (HTTP)
> > requests.
>
> > The code I posted (below) relied on subclassing StorageParticipant and
> > overloading the #consume method. Now that we've added concurrency to
> > our process definition this approach has an unforeseen (but now
> > obvious) shortcoming. When we're in a concurrent branch, the
> > completion of one branch does not send the workitem to another
> > participant (since we're waiting on the other branch), and #consume is
> > never called.
>
> > I have a couple possible fixes up my sleeve: overriding
> > Ruote::Exp::ParticipantExpression#reply to continue the waiting thread
> > (hacky), or not waiting in the first place if we're in a concurrent
> > branch (inconsistent behavior). However, I was wondering if someone
> > might have some suggestions that are less error-prone and better
> > supported by the existing interfaces (ie. no monkey-patching).
>
> Hello,
>
> maybe you could have a look at Participant#on_reply. It was recently enhanced 
> to work with "instantiated participants" (participants instantiated at 
> registration time (I have the impression that it's what you use)).

Nope. I use `register_participant 'foo', Foo`.

>
> The test case is :
>
>  http://github.com/jmettraux/ruote/blob/ruote2.1/test/functional/ft_43...
>
> Basically, it's a callback triggered when the workitem reaches back the 
> participant.

on_reply looks functionally identical to what I did by overloading
StorageParticipant#reply in a subclass. It's nicer, so I can switch to
that, but it still won't solve my problem (see below).

>
> I'm not sure I fully understand your case. It seems that if there are 
> multiple workitems in your engine, any of them will wake up the waiting 
> thread when reaching a storage participant.

*All* my code is currently run in response to user requests, and all
my participants are storage participants. There's one user request per
ruby process, so there's no chance of the waiting thread being woken
by a workitem from another ruote process.

>
> Or, some speculation : what about a participant that explicitely wakes up 
> your thread ?

hmmmm... this seems promising.

>
> ---8<---
> pdef = Ruote.process_definition do
>   sequence do
>     participant 'patient'
>     do_the_work
>     participant 'waker'
>   end
> end
>
> module Bedroom
>
>   @guests = {}
>
>   def self.sleep (label)
>     @guests[label] = Thread.current
>     Thread.current.stop
>   end
>
>   def self.wakeup (label)
>     t = @guests.delete(label)
>     t.wakeup
>   end
> end
>
> class PatientParticipant < Ruote::StorageParticipant
>
>   def reply (workitem)
>     label = workitem.fei.to_s
>     workitem.fields['sleeper'] = label
>     super(workitem)
>     Bedroom.sleep(label)
>   end
> end
>
> class WakerParticipant
>   include Ruote::LocalParticipant
>
>   def consume (workitem)
>     Bedroom.wakeup(workitem.fields.delete('sleeper'))
>     reply_to_engine(workitem)
>   end
> end
> --->8---
>
> (This assumes that the worker is running in the same ruby process as the web 
> application).
>
> When the patient participant replies to the engine, the replying thread is 
> put to sleep. It's woken up a bit later by a specific 'waker' participant.
>
> You could also rely on something inside "do_the_work" to wake up the thread.
>
> Sorry if I'm completely off.

Not at all, this is exactly the completely new perspective I need.
Sorry I was unclear--I was too in the middle of the code to explain it
well. Let me try again.

Let's take this example pdef, where all participants are storage
participants:

  sequence do
    concurrence do
      alpha
      cursor do
        bravo
        charlie
      end
    end
    delta
  end

Now, simplified my user-request code might look like this:

  # user POST /<wfid>/alpha
  def process_user_request wfid, pname
    wi = Workitems.by_wfid_and_pname(wfid, pname) # I've ensured that
there can't be two workitems for 'alpha' on the same wfid
    Participants::Patient.wait_for do
      Workitems.reply wi
    end
  end

The ::wait_for code, as seen in my first post, basically puts the
current thread to sleep, and causes calls to
StorageParticipant#consume to wake it up. So, the following test
works:

  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'bravo']
  process_user_request '<wfid>', 'bravo'
  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'charlie']

So, the wait_for ensures not only that the #reply has been processed,
but that the next participant has been reached (ie. #consume has been
called).

Now, take the failing test:

  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'bravo']
  process_user_request '<wfid>', 'alpha' # difference: replying to the
alpha participant
  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['bravo'] # FAIL!

This fails--the ruby process hangs on the call to
#process_user_request. This is because it's waiting for #consume, but
bravo's #consume has already been called!

Ok, we can add #on_reply to the mix:

  class PatientParticipant
    def on_reply wi
      self.class.continue if
fetch_flow_expression(wi).parent.class.to_s =~ /Concurrence/i
    end
  end

Ok, so this makes the test above pass even though it's obviously dumb.
It generally relies on #consume, but in the case of #alpha, will
continue the sleeping thread. So even this longer test works:

  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'bravo']
  process_user_request '<wfid>', 'alpha'
  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['bravo']
  process_user_request '<wfid>', 'bravo'
  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['charlie']
  process_user_request '<wfid>', 'charlie'
  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['delta']

BUUUT, if you go the other way by replying to bravo first, it fails of
course:

  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'bravo']
  process_user_request '<wfid>', 'bravo'
  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha', 'charlie']
  process_user_request '<wfid>', 'charlie' # BONK!
  Workitems.by_wfid('<wfid>').map(&:participant_name).should ==
['alpha']

It stalls forever. delta won't receive a dispatch until alpha is done,
alpha has already received a dispatch and won't continue the thread,
and I have no way to tell that charlie was the last node in a
concurrent branch.

Clearer? TMI?

#on_reply won't work in the general case, when there's sequential
execution, because it will wake the sleeping thread before the next
participant has received their workitem. So, maybe a WakeupParticipant
would work (I like its simplicity)--but it will double the size of all
our pdef code (oh well). Another option that's looking increasingly
attractive is just to poll the worker to see if it's done. The ideal
solution would be to detect when you're at the end of a branch, but
that there are other branches pausing execution. The
`concurrence :count => x` feature, though, could make that even more
difficult...

Sorry for writing the Great American Novel. Thanks for your advice and
help.

-Ian

-- 
you received this message because you are subscribed to the "ruote users" group.
to post : send email to [email protected]
to unsubscribe : send email to [email protected]
more options : http://groups.google.com/group/openwferu-users?hl=en

Reply via email to