Hello Mario, sorry for yesterday evening, my fuse was rather short.
Here is what I understand from your case, summarized in a piece of sample (running) code : https://gist.github.com/797852 ---8<--- require 'thread' # for the Queue class require 'rubygems' require 'yajl' require 'ruote' PDEF = Ruote.process_definition do sequence do device :device => 4 device :device => 7 end end # Re-opening to add a #device method # class Ruote::Workitem def device params['device'] || fields['device'] end end class AmqpParticipant include Ruote::LocalParticipant def consume(workitem) correlate(workitem) $queue << encode(workitem) end def cancel(fei, flavour) # no implementation for this example end protected def encode(workitem) Rufus::Json.encode({ 'type' => 2, 'device' => workitem.device }) end def correlate(workitem) correlations = @context.storage.get_engine_variable('_correlations') || [] correlations << [ workitem.device, workitem.fei.sid ] @context.storage.put_engine_variable('_correlations', correlations) p [ :out, @context.storage.get_engine_variable('_correlations') ] end end class AmqpReceiver < Ruote::Receiver def initialize(engine, options={}) super Thread.new { listen } end protected def listen loop do sleep(rand * 0.1) msg = $queue.pop # blocking hsh = (Rufus::Json.decode(msg) rescue nil) p [ :receiver, hsh ] next if hsh == nil case hsh['type'] when 1 launch(PDEF) when 3 correlate(msg, hsh) else $queue << msg # put back message end end end def correlate(msg, hsh) puts "received message from device #{hsh['device']}" correlations = @context.storage.get_engine_variable('_correlations') || [] p [ :in, correlations ] correlation = correlations.find { |cor| cor.first == hsh['device'] } if correlation correlations.delete(correlation) @context.storage.put_engine_variable('_correlations', correlations) wi = workitem(correlation[1]) reply_to_engine(wi) if wi # ignore 'unrelated' msgs else return # discard #$queue << msg # re-queue # this version simply discards unexpected messages # re-queueing... why not, could make the system busy... end end end class Devices def initialize @thread = Thread.new { listen } end def join @thread.join end protected def listen loop do sleep(rand * 0.1) msg = $queue.pop # blocking hsh = (Rufus::Json.decode(msg) rescue nil) p [ :devices, hsh ] next if hsh == nil case hsh['type'] when 2 puts "device #{hsh['device']} received message..." $queue << Rufus::Json.encode(hsh.merge('type' => 3)) else $queue << msg # put back message end end end end $engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new)) $queue = Queue.new $receiver = AmqpReceiver.new($engine) $engine.register_participant :device, AmqpParticipant $devices = Devices.new $engine.noisy = true # displays all the engine activity $queue << Rufus::Json.encode({ 'type' => 1 }) $queue << Rufus::Json.encode({ 'type' => 1 }) $queue << Rufus::Json.encode({ 'type' => 1 }) $devices.join --->8--- It simulates AMQP with a Ruby Queue class. The correlation data is placed in an engine variable. The queue is shared by the participants, the receiver and the devices... It's tested against ruote 2.1.12 (edge), but it should work with ruote 2.1.11. I hope this will help, sorry again, -- John Mettraux - http://jmettraux.wordpress.com -- you received this message because you are subscribed to the "ruote users" group. to post : send email to [email protected] to unsubscribe : send email to [email protected] more options : http://groups.google.com/group/openwferu-users?hl=en
