Hi again, Retaking this after a week on another project.
I'm using a Hash for correlation. I'm interacting with an external device via JSON messages over AMQP. Each message has a "command" field which is used to indicate the message type. Some of the commands are used to start specific workflows (I have a pdefs directory where the workflows are named depending on the command that is supposed to trigger them). My subclass of the AMQPReceiver checks whether there is a workflow for each specific message and launches it. At some point a workflow can wait for a specific command to arrive (perhaps an ACK or some indication from the external device). For that I have a participant that adds the workitem to the aforesaid Hash, keyed by the device ID (from what you said, I should probably save the FEI instead of the workitem itself in the Hash). When a message comes in, the Receiver checks if there is a workitem waiting for it before checking for a workflow to start. If there is a workitem, it replies to the engine with it. The thing is, if Ruote goes down, the Hash will disappear. That means that any workflows that are waiting for messages will stay in that state forever. OTOH, because of the nature of the system, if Ruote goes down for more than a couple of minutes I really should restart everything from scratch (even emptying the AMQP queues), since any incoming messages might result in actions that shouldn't take place. The thing at the moment is that the Storage is persistent while the Hash is not. My view is that, either both should be persistent, or both non-persistent (or use timeouts if I continue with the current implementation - which I should be doing anyway). Because of what I said in the previous paragraph, I'm more inclined to make everything non-persistent. Ideas? Thanks, -Mario. -- I want to change the world but they won't give me the source code. On Thu, Jun 23, 2011 at 00:35, John Mettraux <[email protected]> wrote: > > On Wed, Jun 22, 2011 at 07:46:25PM +0200, Mario Camou wrote: > > > > I have a participant/receiver combination that extends > > RuoteAMQP::ParticipantProxy / RuoteAMQP::Receiver. The Participant > doesn't > > call reply_to_engine in consume(), it just registers the workitem in an > > in-memory Hash and returns. The Receiver looks up the workitem in the > Hash > > when it gets a message and does the receive (or a launch if there is no > > corresponding workitem). > > Hello Mario, > > note that you don't have to save the workitem in a hash like you do. The > engine already did that for you (although it did in the storage). > > You can retrieve the workitem later by doing in the participant : > > ---8<--- > emitted_workitem = applied_workitem(fei_or_received_workitem) > --->8--- > > This method comes from Ruote::LocalParticipant (and Ruote::ReceiverMixin). > > > What happens if Ruote goes down after the execution of the Participant > but > > before the Receiver gets the message? Will the workitem be persisted (so > I > > should probably persist the Hash too) or not? > > > > In my use case I think it makes more sense for it NOT to be persisted. As > a > > matter of fact, I'm thinking of flushing the AMQP queue before starting > up > > so we don't have any backed-up messages. > > Your implementation saves before (or right after emitting the workitem over > the AMQP wire). It seems you care about the workitem. I'd suggest using the > applied_workitem as shown above. > > > Best regards, > > -- > John Mettraux - http://jmettraux.wordpress.com > > -- > you received this message because you are subscribed to the "ruote users" > group. > to post : send email to [email protected] > to unsubscribe : send email to > [email protected] > more options : http://groups.google.com/group/openwferu-users?hl=en > -- you received this message because you are subscribed to the "ruote users" group. to post : send email to [email protected] to unsubscribe : send email to [email protected] more options : http://groups.google.com/group/openwferu-users?hl=en
