Hello Mario,

sorry for yesterday evening, my fuse was rather short.

Here is what I understand from your case, summarized in a piece of sample 
(running) code :

  https://gist.github.com/797852

---8<---
require 'thread'
  # for the Queue class

require 'rubygems'
require 'yajl'
require 'ruote'

PDEF = Ruote.process_definition do
  sequence do
    device :device => 4
    device :device => 7
  end
end

# Re-opening to add a #device method
#
class Ruote::Workitem
  def device
    params['device'] || fields['device']
  end
end

class AmqpParticipant
  include Ruote::LocalParticipant

  def consume(workitem)
    correlate(workitem)
    $queue << encode(workitem)
  end
  def cancel(fei, flavour)
    # no implementation for this example
  end
  protected
  def encode(workitem)
    Rufus::Json.encode({ 'type' => 2, 'device' => workitem.device })
  end
  def correlate(workitem)
    correlations = @context.storage.get_engine_variable('_correlations') || []
    correlations << [ workitem.device, workitem.fei.sid ]
    @context.storage.put_engine_variable('_correlations', correlations)

    p [ :out, @context.storage.get_engine_variable('_correlations') ]
  end
end

class AmqpReceiver < Ruote::Receiver

  def initialize(engine, options={})
    super
    Thread.new { listen }
  end
  protected
  def listen
    loop do
      sleep(rand * 0.1)
      msg = $queue.pop # blocking
      hsh = (Rufus::Json.decode(msg) rescue nil)
      p [ :receiver, hsh ]
      next if hsh == nil
      case hsh['type']
        when 1
          launch(PDEF)
        when 3
          correlate(msg, hsh)
        else
          $queue << msg # put back message
      end
    end
  end
  def correlate(msg, hsh)

    puts "received message from device #{hsh['device']}"

    correlations = @context.storage.get_engine_variable('_correlations') || []

    p [ :in, correlations ]

    correlation = correlations.find { |cor| cor.first == hsh['device'] }

    if correlation
      correlations.delete(correlation)
      @context.storage.put_engine_variable('_correlations', correlations)
      wi = workitem(correlation[1])
      reply_to_engine(wi) if wi
        # ignore 'unrelated' msgs
    else
      return # discard
      #$queue << msg # re-queue
        # this version simply discards unexpected messages
        # re-queueing... why not, could make the system busy...
    end
  end
end

class Devices

  def initialize
    @thread = Thread.new { listen }
  end
  def join
    @thread.join
  end
  protected
  def listen
    loop do
      sleep(rand * 0.1)
      msg = $queue.pop # blocking
      hsh = (Rufus::Json.decode(msg) rescue nil)
      p [ :devices, hsh ]
      next if hsh == nil
      case hsh['type']
        when 2
          puts "device #{hsh['device']} received message..."
          $queue << Rufus::Json.encode(hsh.merge('type' => 3))
        else
          $queue << msg # put back message
      end
    end
  end
end

$engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
$queue = Queue.new
$receiver = AmqpReceiver.new($engine)
$engine.register_participant :device, AmqpParticipant
$devices = Devices.new

$engine.noisy = true
  # displays all the engine activity

$queue << Rufus::Json.encode({ 'type' => 1 })
$queue << Rufus::Json.encode({ 'type' => 1 })
$queue << Rufus::Json.encode({ 'type' => 1 })

$devices.join
--->8---

It simulates AMQP with a Ruby Queue class. The correlation data is placed in an 
engine variable.

The queue is shared by the participants, the receiver and the devices...

It's tested against ruote 2.1.12 (edge), but it should work with ruote 2.1.11.


I hope this will help, sorry again,

-- 
John Mettraux - http://jmettraux.wordpress.com

-- 
you received this message because you are subscribed to the "ruote users" group.
to post : send email to [email protected]
to unsubscribe : send email to [email protected]
more options : http://groups.google.com/group/openwferu-users?hl=en

Reply via email to