On Tue, Feb 28, 2012 at 05:55:03PM +0100, Nicola wrote:
>
> (...)
>
> The main problem I am facing is: what is the best way to execute the computer
> tasks in separate (Ruby) processes, and start/stop them and track their 
> status,
> say, from a web interface à la ruote-kit? As far as I can see, Ruote can 
> spawn a
> new thread when it hands a workitem to a participant, but not a new (Ruby)
> process. Who should be responsible for spawning a separate Ruby process? 
> Should
> it be done in the participant? Or in the “main” program? Or should I use some
> client-server architecture?

Hello Nicola,

Not everybody needs ruote to fork a new Ruby process for a participant
execution, most of the use cases don't require that.

Somehow, since ruote has a worker architecture, as soon as you have multiple
workers, you are actually executing task in separate Ruby processes.

If I read correctly, you want to run commands via ruote. A very naive
implementation:

---8<---
class SpawnerParticipant
  include Ruote::LocalParticipant

  def on_workitem
    workitem.fields['result'] = `#{workitem.fields['command']}`
    reply
  end

  def on_cancel
    # well, can't do much... :-(
  end
end
--->8---

A better implementation would probably use something like

  https://github.com/ahoward/open4

or

  https://github.com/kschiess/procrastinate

store the PIDs somehow (and the mapping between a workitem id and a PID) for
cancel attempts...

You could store the identifier in the command itself so that the cancel code
can skim the "ps" output to retrieve the PID to signal to...


> I have a feeling that workers play a role here, but how they... work (ehm) is
> not that clear to me yet. I have read both the Ruote-Kit Readme and the blog
> post about Ruote 2.1, but what code like this does
>
> storage = Ruote::FsStorage.new('ruote_work')
> worker = Ruote::Worker.new(storage)
> worker.run
>   # current thread is now running worker
>
> and whether it is self-contained is still a mystery to me (how does this know
> what to look for in the storage? What does it “run”?). When I run it, a script
> like the above either gets stuck or it gives an error like “no JSON backend
> found” (in the case of ruote-kit). I would be glad if someone could help me 
> make
> my mind clear on these issues.

A detailed issue report would help us help you.

When it gets "stuck", I guess it simply loops for work and since there is
nothing to do, it appears to be stuck.

Doing something like "require 'rufus-json/automatic'" can alleviate the "no
JSON backend found".

.

Generally the code above comes as

  # self-contained dashboard + worker + storage

  dashboard = Ruote::Dashboard.new(
    Ruote::Worker.new(
      Ruote::FsStorage.new('ruote_work')))

Useful for testing.

  # dashboard only (no ruote process execution takes place here, front-end)

  dashboard = Ruote::Dashboard.new(
    Ruote::FsStorage.new('ruote_work'))

  # worker only (no control, pure execution, back-end)

  worker = Ruote::Worker.new(
    Ruote::FsStorage.new('ruote_work'))
  worker.run

The trick is that, when a Dashboard is passed a worker at initialization, it
will call its #run method, so the last bit can be rewritten as:

  dashboard = Ruote::Dashboard.new(
    Ruote::Worker.new(
      Ruote::FsStorage.new('ruote_work')))

and be equivalent (well, there is visibly one object more, an instance of
Dashboard, but it's not very heavy).

Granted, it may look byzantine, but I've settled on this "encapsulation"
mecha to embody composition of dashboard / worker / storage and variants.
It's byzantine enough that people stop and start thinking about the
implications.

.

Now about the "running" thing. When a worker runs, it polls the storage for
messages that are pieces of ruote process execution. The msg generally
involves fetching a flow expression or instantiating a new one and do
something with it.

When the message is handled, it is discarded, but probably
a new, resulting, message has been pushed to the storage (that new message
will get consumed by the first worker who'll put its hand on it).

Note that in the case of a concurrence application, one message per
concurrent branch is issued.

.

Here is what the #run method looks like:

  
https://github.com/jmettraux/ruote/blob/147bad157e4a6ef387fe812a8da53a74c530af6a/lib/ruote/worker.rb#L88-94


> And if I am allowed to abuse your patience a bit more, I have also a couple of
> specific, and probably naive, questions:
>
> 1) if I define a participant by subclassing Ruote::StorageParticipant, do I
> still need to mixin Ruote::LocalParticipant?

No,

---8<---
module M
  def x
    p :x
  end
end

class A
  include M
  def y
    p :y
  end
end

class B < A
end

b = B.new
b.x # => :x
b.y # => :y
--->8---

> (2) In many examples of process definitions, participants are passed a :task
> parameter. Which makes me wonder: does that have a special meaning in Ruote? I
> have never seen an example of a participant implementation making use of the
> :task parameter, so I have always assumed that it is a name like another and
> have used it as follows:
>
> class MyParticipant
>   include Ruote::LocalParticipant
>
>   def consume(workitem)
>     case workitem.params[:task]
>     when 'do this' then dothis(workitem)
>     when 'do that' then dothat(workitem)
>     end
>   end
>
> private
>   def dothis(wi)
>     [...]
>   end
>
>   def dothat(wi)
>     [...]
>   end
> end
>
> Is this the intended usage pattern?

It's a possible usage pattern.

Most of the time, the :task attribute is used as a "title" for the workitem
handed to a human participant. So it's more of a convention.

You are totally free to follow the pattern you described.


Questions are welcome. Best regards,

--
John Mettraux - http://lambda.io/processi

-- 
you received this message because you are subscribed to the "ruote users" group.
to post : send email to [email protected]
to unsubscribe : send email to [email protected]
more options : http://groups.google.com/group/openwferu-users?hl=en

Reply via email to