Hi there, we just had an issue in one of our systems and it looks like there is an issue with locking in the AggregateProcessor in a distributed environment.
Ill try to explain it: =================================================== Scenario =================================================== We use camel-core and camel-hazelcast 2.16.5 and hazelcast 3.5.2 We have a route which sends a message to an Websphere MQ Queue (via JMSComponent) and after that we put the message into an aggregator which uses the JMSCorrelationId to correlate the request and the response. from(epAggregation) .aggregate(header("JMSCorrelationID"), new CustomAggregationStrategy()) .completionTimeout(Integer.parseInt(getContext().resolvePropertyPlaceholders ("{{timeout}}"))) .completionSize(2) .aggregationRepository(aggrRepo) The aggregationRepository aggrRepo is created like this HazelcastAggregationRepository aggrRepo = new HazelcastAggregationRepository ("aggrRepoDrsBatch", hcInst)); where hcInst is an Instance of com.hazelcast.core.HazelcastInstance. We also have another route which reads the response from the response queue and forwards it to the aggregator. The environment consists of two nodes on which the same code is running (so essentially the send and response routes and the aggregation) The problem arises when the response is returned really fast and is consumed on the node that didn't sent the response. ======================================== Analysis ======================================== I digged a bit in the camel code and it seems to me that the problem here is the lock in the AggregateProcessor as it is local to the VM in which the code runs. I'll try for an example to make this more clear: - Node A sends a MQ message and after that it puts the message into the aggregator. The AggregateProcessor runs and checks the lock before going into doAggregation() - in doAggregation it tries to get the Exchange from the repository and doesn't find any. So it continues to aggregate the first message an writes this into the repository - In about the same time between reading the exchange from the repository and before writing the "aggregated" first message into the repository Node B fetches the reply from the response queue and sends it to the aggregator. As in node A the lock is checked and as the code runs on another VM the lock is free and the AggregateProcessor can go to doAggregation - in doAggregation the Node tries to get the Exchange from the repository before the other node has written it. And like Node A the code proceeds with creating the first Exchange for the aggregation and writes in into the repository. The result is that one of the nodes will override the Exchange the other created before. And the Aggreagtion will never complete (actually it does but because of the timeout) ========================================= Ideas to solve the problem ========================================= - probably optimistic locking is an option here as HazelcastAggregationRepository supports this by implementing OptimisticLockingAggregationRepository => I'd like to hear your thoughts on this. - currently we can stop the route consuming from the response route on one Node to eliminate the error. But this is not an option for a long time because we lose the ability for fail over - probably it's an idea to make the AggregateProcessor get the Lock Object from the repository. So for example for the HazelcastAggregationRepository the repository can return the lock object for the hazelcast map which would lock it for the whole cluster. - I thought about resending the MQ message in case of an timeout but as the request has side effects on the system that processes the message this is not really an option. So I hope I could make myself clear, If you have any questions which would help you to help me, I'd happy to answer them. Regards, Michael