On Wed, Jul 21, 2010 at 7:13 AM, I. E. Smith-Heisters <[email protected]> wrote:
>
> Ok, I finally got something working, but I'm afraid it will be
> brittle. I figured that using a service and notification events would
> be more flexible, produce code with looser coupling with Ruote, and
> provide a path toward better asynchronicity in the future. So, I
> removed the wait code from my participants and added it to a static
> module. I then created a service called "PatientNotifier" like this:
>
>  # Wakes a thread that is stopped using `Patient::wait_for`. It will
> wake the
>  # thread in the following
> circumstances:
>  #
>  # * when a "dispatch" event is received that has a workitem that
> matches the
>  #   criteria passed to
> `Patient::wait_for`
>  # * when a "reply" event is received, and no further events are
> received for
>  #   up to a `listen_for`
> seconds.
>  #
>  # This latter circumstance is potentially problematic if some
> messages take
>  # more than `listen_for` seconds to be processed, but it's the only
> way to
>  # detect situations where a "dispatch" won't be sent until something
> else
>  # happens, eg. when one branch of one or more concurrent branches
> has ended.

> ---------------------------------------------------------------------------------------------
>  <
>  # This is unavoidable given Ruote's design of never knowing what's
> going to
>  # happen next.
> ---------------------------------------------------------------------------------------------
>  <

Framing that one :-)

"embrace the asynchronous"

>  class PatientNotifier <
> ApplicationNotifier
>    class << self
>      def subscribe_to; %w(reply dispatch);
> end
>      def listen_for; 1;
> end
>
> end
>
>    def initialize *args
>      super *args
>     �...@lock =
> Mutex.new
>
> end
>
>    def dispatches; @lock.synchroni...@dispatches}; end
>    def dispatches= new_d; @lock.synchroni...@dispatches = new_d};
> end
>    def listen_thread; @lock.synchroni...@listen_thread}; end
>    def listen_thread= new_d; @lock.synchroni...@listen_thread =
> new_d}; end
>
>    def notify msg
>      send "handle_#{msg["action"]}",
> msg
>    end
>
>    def handle_reply
> msg
>      start_listening
> msg
>    end
>
>    def handle_dispatch msg
>      self.dispatches << msg if dispatches
>      continue
> msg
>
> end
>
>    def continue
> msg
>      wi =
> Ruote::Workitem.new(msg["workitem"])
>      ::Patient.continue wi
>    end
>
>    def start_listening msg
>      if listen_thread
>
> listen_thread.kill
>
> end
>      self.listen_thread = Thread.new do
>        listen
> msg
>      end
>    end
>
>    def listen msg
>      self.dispatches =
> []
>      sleep self.class.listen_for
>      if self.dispatches.empty?
>        continue msg
>      end
>    ensure
>      self.listen_thread, self.dispatches = nil, nil
>    end
>  end

Ouch, I have trouble re-formatting that each time. It looks painful.

> I then call this on my engine: `add_service 'patient_notifier',
> 'notifiers', 'Workflow::PatientNotifier'`
>
> The rest of the code is pretty similar to the code I posted before
> with some slight refactoring to remove it from Participants and put it
> into a singleton Patient module (sans #consume method, of course).
>
> I'm interested to hear your thoughts on this approach.

I read at some point that you'd like to know your position in the flow
in order to not sleep. Maybe this could help :

( gist at http://gist.github.com/483796 )

---8<---
require 'rubygems'
require 'ruote'

engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))

pdef = Ruote.define do
  concurrence do
    alpha
    sequence do
      bravo
      charlie
    end
    delta
  end
end

class MyParticipant
  include Ruote::LocalParticipant

  def consume (workitem)
    p [
      workitem.participant_name,
      :parent_type, parent_type(workitem),
      :last?, last?(workitem)
    ]
    reply(workitem)
  end

  # Returns the name of the parent expression (like 'sequence' or
  # 'concurrence').
  #
  def parent_type (workitem)
    fetch_flow_expression(workitem).parent.name
  end

  # Returns true if the participant expression with this workitem is the
  # last in the list of children of the parent expression
  # (warning, it doesn't mean that much when in a concurrence expression).
  #
  def last? (workitem)
    fexp = fetch_flow_expression(workitem)
    fexp.fei.child_id == fexp.parent.tree[2].length - 1
  end
end

engine.register_participant '.+', MyParticipant

wfid = engine.launch(pdef)

engine.wait_for(wfid)
--->8---

I hope it helps.

-- 
John Mettraux   -   http://jmettraux.wordpress.com

-- 
you received this message because you are subscribed to the "ruote users" group.
to post : send email to [email protected]
to unsubscribe : send email to [email protected]
more options : http://groups.google.com/group/openwferu-users?hl=en

Reply via email to