[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alex Dettinger updated CAMEL-10272: ----------------------------------- Comment: was deleted (was: Peter, thanks for your inputs. As you don't use _parallelAggregate()_, it may not be the cause of this issue. However, it's interesting to note that I could also produce _oldExchange_ to be null more than once with _parallelAggregate()_, even with a thread safe aggregation strategy. It may be an itch to scratch in another ticket. Back to this issue, I will propose a PR to log such exceptions in DEBUG level so one could have more details when facing such a situation. We could make a great step forward if you could reproduce the behavior once while logging/unwinding throwables from your aggregation strategy. Or you could also test against master with debug log level if the PR is accepted.) > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > ------------------------------------------------------------------------------------------ > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 > Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)