[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15715672#comment-15715672 ] ASF GitHub Bot commented on CAMEL-10272: Github user asfgit closed the pull request at: https://github.com/apache/camel/pull/1326 > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712463#comment-15712463 ] Alex Dettinger commented on CAMEL-10272: Peter, thanks for your inputs. As you don't use _parallelAggregate()_, it may not be the cause of this issue. However, it's interesting to note that I could also produce _oldExchange_ to be null more than once with _parallelAggregate()_, even with a thread safe aggregation strategy. It may be an itch to scratch in another ticket. Back to this issue, I will propose a PR to log such exceptions in DEBUG level so one could have more details when facing such a situation. We could make a great step forward if you could reproduce the behavior once while logging/unwinding throwables from your aggregation strategy. Or you could also test against master with debug log level if the PR is accepted. > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712469#comment-15712469 ] ASF GitHub Bot commented on CAMEL-10272: GitHub user aldettinger opened a pull request: https://github.com/apache/camel/pull/1326 CAMEL-10272: Added a debug log and completed the javadoc Please check [CAMEL-10272](https://issues.apache.org/jira/browse/CAMEL-10272) out for more details. sourcecheck and tests are ok. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aldettinger/camel master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/camel/pull/1326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1326 commit 2e3de5e889241e707054f01f9cf385aea72508fe Author: aldettingerDate: 2016-12-01T15:48:48Z CAMEL-10272: Added a debug log covering for instance a runtime exception thrown from a custom aggregation strategy commit b522d465467946a2640630e87d0854298c978953 Author: aldettinger Date: 2016-12-01T15:57:06Z CAMEL-10272: Completed the aggregation strategy javadoc > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712464#comment-15712464 ] Alex Dettinger commented on CAMEL-10272: Peter, thanks for your inputs. As you don't use _parallelAggregate()_, it may not be the cause of this issue. However, it's interesting to note that I could also produce _oldExchange_ to be null more than once with _parallelAggregate()_, even with a thread safe aggregation strategy. It may be an itch to scratch in another ticket. Back to this issue, I will propose a PR to log such exceptions in DEBUG level so one could have more details when facing such a situation. We could make a great step forward if you could reproduce the behavior once while logging/unwinding throwables from your aggregation strategy. Or you could also test against master with debug log level if the PR is accepted. > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15709764#comment-15709764 ] Peter Keller commented on CAMEL-10272: -- Alex, thank you very much for the analysis. You are right: if an exception is thrown then {{oldExchange}} may be {{null}}. As this may be the case in our route, it is mysterious why the exception is thrown in a non-deterministic way (of course the root cause may be another multi-threading issue not directly correlated to Camel). Note, if {{parallelAggregate()}} is used (as mentioned above we don't use this setup), then {{oldExchange}} may also be null. This can be shown using a debugger with setting a breakpoint in {{doAggregateInternal}} on the line with {{ExchangeHelper.prepareAggregation}}. Shouldn't Camel prevent {{oldExchange}} to be {{null}} in this case? The logging of the (possible) exception caused by the aggregation strategy could help to trace down the cause. Of course this could be done in the implementation of the aggregation strategy itself with a try/catch clause without changing the Camel framework. Unwinding the exception to the default error handler may break existing routes which is not desirable. That said, I personally would prefer this possibility. > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15694189#comment-15694189 ] ASF GitHub Bot commented on CAMEL-10272: Github user aldettinger closed the pull request at: https://github.com/apache/camel/pull/1310 > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15693983#comment-15693983 ] Alex Dettinger commented on CAMEL-10272: I don't think that we are facing such a simple race condition here. In the method _doAggregateInternal(...)_, _result.get(...)_ and _result.set(...)_ are not run concurrently. A single thread is used at aggregation time, in the provided example at least. However, _oldExchange_ could be null more than once when the user custom aggregation strategy throws a runtime exception: {code} public static class Router { public String[] routeTo() { return new String[] {"log:MSGROUTER_0?level=TRACE", "log:MSGROUTER_1?level=TRACE"}; } } @Override protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start"). recipientList(). method(new Router()). aggregationStrategy(new AggregationStrategy(){ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { System.out.println("oldExchange is null "+newExchange.getExchangeId()+" thread: "+Thread.currentThread()); } System.out.println(3/0); // throws java.lang.ArithmeticException return null; } }). parallelProcessing(). end(); } }; } {code} outputs: {noformat} oldExchange is null ID-alex-42036-1479994398470-0-3 thread: Thread[Camel (camel-1) thread #2 - RecipientList-AggregateTask,5,main] oldExchange is null ID-alex-42036-1479994398470-0-4 thread: Thread[Camel (camel-1) thread #2 - RecipientList-AggregateTask,5,main] {noformat} That said, I see the following drawbacks with the current implementation: * The AggregationStrategy class javadoc lists a single case where oldExchange could be null whereas two exists * This case could be difficult to debug from a camel user perspective. Camel kind of hides the runtime exception in the custom aggregation strategy. >From there, I see 2 paths to handle a runtime exception unwind from a custom >aggregation strategy: * Produce an ERROR log. We then need to correct the javadoc accordingly. * Unwind the exception to the default error handler. Note that this is what happen when parallelProcessing is false. Any thoughts on the right path then ? > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via >
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15693955#comment-15693955 ] ASF GitHub Bot commented on CAMEL-10272: GitHub user aldettinger opened a pull request: https://github.com/apache/camel/pull/1310 CAMEL 10272: Fixed few comments Note that this PR does not fix [CAMEL-10272](https://issues.apache.org/jira/browse/CAMEL-10272). I'll post on the issue tracker directly to details my findings. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aldettinger/camel master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/camel/pull/1310.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1310 commit 42efb4ef220beb26e8a49c7e729c81d3e0054a63 Author: aldettingerDate: 2016-11-24T18:00:02Z CAMEL 10272: Fixed few comments > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15455190#comment-15455190 ] Peter Keller commented on CAMEL-10272: -- I updated the analysis: There is only one instance of {{AggregateOnTheFlyTask}}. > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 > Environment: MacOS 10.11.6, JRE 1.7.0_79 >Reporter: Peter Keller >Priority: Critical > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > deterministic due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}} for > *every* target in the recipient list-. via > {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees > that this is only be done once) > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared -by every instance of > {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be > {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()
[ https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15446658#comment-15446658 ] Ushnash Shukla commented on CAMEL-10272: I'll take a look at this. > Aggregation is broken due to race condition in > ParallelAggregateTask.doAggregateInternal() > -- > > Key: CAMEL-10272 > URL: https://issues.apache.org/jira/browse/CAMEL-10272 > Project: Camel > Issue Type: Bug > Components: camel-core >Affects Versions: 2.16.3, 2.17.3 >Reporter: Peter Keller >Priority: Critical > > Unfortunately, I am not able to provide a (simple) unit test for > comprehending the problem. Furthermore our (complex) unit tests are not > determinist due to the root cause of the problem. > However I tried to analyze the Camel Java code, to work out the problem. > Please find below my findings. > h3. Problem > The {{oldExchange}} is {{null}} more than once in the aggregator if a > recipient list is processed in parallel. > h3. Camel route > In my Camel route, a recipient list is worked of in parallel: > {code} > from("direct:start") > .to("direct:pre") > .recipientList().method(new MyRecipientListBuilder()) > .stopOnException() > .aggregationStrategy(new MyAggregationStrategy()) > .parallelProcessing() > .end() > .bean(new MyPostProcessor()); > {code} > Snippet of {{MyAggregationStrategy}}: > {code} > @Override > @SuppressWarnings("unchecked") > public Exchange aggregate(final Exchange oldExchange, final Exchange > newExchange) { > if (oldExchange == null) { > // this is the case more than once which is not expected! > } > // ... > {code} > {{oldExchange}} is null more than once which is not expected and which > contradicts the contract with Camel. > h3. Analysis > During the processing, Camel invokes {{MulticastProcessor.process()}}. Here > the result object {{AtomicExchange}} is created which is shared during the > whole processing. > If the processing should be done in parallel (as it is the case for our > route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one > instance of {{AggregateOnTheFlyTask}} is initialized and > {{aggregateOnTheFly()}} is invoked *asynchronously* via {{run()}} for > *every* target in the recipient list. > In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is > generated, and if aggregation is not done in parallel (as it is the case in > our route), {{ParallelAggregateTask.run()}}, > {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and > {{ParallelAggregateTask.doAggregateInternal()}} is invoked: > {code} > protected void doAggregateInternal(AggregationStrategy strategy, > AtomicExchange result, Exchange exchange) { > if (strategy != null) { > // prepare the exchanges for aggregation > Exchange oldExchange = result.get(); > ExchangeHelper.prepareAggregation(oldExchange, exchange); > result.set(strategy.aggregate(oldExchange, exchange)); > } > } > {code} > However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a > race condition as {{result}} is shared by every instance of > {{AggregateOnTheFlyTask}} such that {{oldExchange = result.get()}} > may be {{null}} more than once! > Note: As a new instance of {{ParallelAggregateTask}} for every target in > recipient list is created, the {{synchronized}} method > {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition! > Does this sounds reasonably? -- This message was sent by Atlassian JIRA (v6.3.4#6332)