Hi Michael, it's a bit hard to follow so I could be misunderstanding your issue; is your issue that there is a race condition between the aggregator that expects the reply on node A and another aggregator that is not aware of the initial request on node B?
If you're doing only request-reply correlation perhaps take a look at InOut message exchange pattern with a correlation property[1] with the replying application setting the ReplyToQMgr to the requester's queue manager. Or, place the reply in a Hazelcast queue regardless of the queue manager the reply landed on and process the reply from there. Also I think that it would be better to setup the reply coordination expectation (with timeouts and without transactions -- that would block) before sending the message. 2c [1] https://camel.apache.org/correlation-identifier.html On Wed, Aug 16, 2017 at 5:10 PM, Michael Lück <michael.lu...@hm-ag.de> wrote: > Hi there, > > we just had an issue in one of our systems and it looks like there is an > issue with locking in the AggregateProcessor in a > distributed environment. > > I’ll try to explain it: > > =================================================== > Scenario > =================================================== > > We use camel-core and camel-hazelcast 2.16.5 and hazelcast 3.5.2 > > We have a route which sends a message to an Websphere MQ Queue (via > JMSComponent) and after that we put > the message into an aggregator which uses the JMSCorrelationId to correlate > the request and the response. > > from(epAggregation) > .aggregate(header("JMSCorrelationID"), new CustomAggregationStrategy()) > > .completionTimeout(Integer.parseInt(getContext().resolvePropertyPlaceholders > ("{{timeout}}"))) > .completionSize(2) > .aggregationRepository(aggrRepo) > > The aggregationRepository aggrRepo is created like this > HazelcastAggregationRepository aggrRepo = new > HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst)); > where hcInst is an Instance of com.hazelcast.core.HazelcastInstance. > > We also have another route which reads the response from the response queue > and forwards it to the aggregator. > > The environment consists of two nodes on which the same code is running (so > essentially the send and response routes > and the aggregation) > > The problem arises when the response is returned really fast and is consumed > on the node that didn't sent the response. > > ======================================== > Analysis > ======================================== > > I digged a bit in the camel code and it seems to me that the problem here is > the lock in the AggregateProcessor as it is local > to the VM in which the code runs. I'll try for an example to make this more > clear: > > - Node A sends a MQ message and after that it puts the message into the > aggregator. The AggregateProcessor runs and > checks the lock before going into doAggregation() > - in doAggregation it tries to get the Exchange from the repository > and doesn't find any. So it continues to aggregate > the first message an writes this into the repository > - In about the same time between reading the exchange from the repository > and before writing the "aggregated" first > message into the repository Node B fetches the reply from the response > queue and sends it to the aggregator. As in node A > the lock is checked and as the code runs on another VM the lock is free > and the AggregateProcessor can go to doAggregation > - in doAggregation the Node tries to get the Exchange from the > repository before the other node has written it. > And like Node A the code proceeds with creating the first Exchange > for the aggregation and writes in into the > repository. > > The result is that one of the nodes will override the Exchange the other > created before. And the Aggreagtion will never > complete (actually it does but because of the timeout) > > ========================================= > Ideas to solve the problem > ========================================= > - probably optimistic locking is an option here as > HazelcastAggregationRepository supports this by implementing > OptimisticLockingAggregationRepository > => I'd like to hear your thoughts on this. > > - currently we can stop the route consuming from the response route on one > Node to eliminate the error. But this is not > an option for a long time because we lose the ability for fail over > - probably it's an idea to make the AggregateProcessor get the Lock Object > from the repository. So for example for the > HazelcastAggregationRepository the repository can return the lock object > for the hazelcast map which would lock it for the > whole cluster. > - I thought about resending the MQ message in case of an timeout but as the > request has side effects on the system that > processes the message this is not really an option. > > So I hope I could make myself clear, > If you have any questions which would help you to help me, I'd happy to > answer them. > > Regards, > Michael > > > -- Zoran Regvart