[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-12-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15715672#comment-15715672
 ] 

ASF GitHub Bot commented on CAMEL-10272:


Github user asfgit closed the pull request at:

https://github.com/apache/camel/pull/1326


> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees 
> that this is only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared -by every instance of 
> {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be 
> {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition!
> Does this sounds reasonably?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-12-01 Thread Alex Dettinger (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712463#comment-15712463
 ] 

Alex Dettinger commented on CAMEL-10272:


Peter, thanks for your inputs.

  As you don't use _parallelAggregate()_, it may not be the cause of this issue.
However, it's interesting to note that I could also produce _oldExchange_ to be 
null more than once with _parallelAggregate()_, even with a thread safe 
aggregation strategy.
It may be an itch to scratch in another ticket.

  Back to this issue, I will propose a PR to log such exceptions in DEBUG level 
so one could have more details when facing such a situation.
We could make a great step forward if you could reproduce the behavior once 
while logging/unwinding throwables from your aggregation strategy.
Or you could also test against master with debug log level if the PR is 
accepted.

> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees 
> that this is only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared -by every instance of 
> {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be 
> {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition!
> Does this sounds reasonably?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-12-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712469#comment-15712469
 ] 

ASF GitHub Bot commented on CAMEL-10272:


GitHub user aldettinger opened a pull request:

https://github.com/apache/camel/pull/1326

CAMEL-10272: Added a debug log and completed the javadoc

Please check 
[CAMEL-10272](https://issues.apache.org/jira/browse/CAMEL-10272) out for more 
details.
sourcecheck and tests are ok.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aldettinger/camel master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/camel/pull/1326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1326


commit 2e3de5e889241e707054f01f9cf385aea72508fe
Author: aldettinger 
Date:   2016-12-01T15:48:48Z

CAMEL-10272: Added a debug log covering for instance a runtime exception 
thrown from a custom aggregation strategy

commit b522d465467946a2640630e87d0854298c978953
Author: aldettinger 
Date:   2016-12-01T15:57:06Z

CAMEL-10272: Completed the aggregation strategy javadoc




> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees 
> that this is only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared -by every instance of 
> {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be 
> {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race 

[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-12-01 Thread Alex Dettinger (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15712464#comment-15712464
 ] 

Alex Dettinger commented on CAMEL-10272:


Peter, thanks for your inputs.

  As you don't use _parallelAggregate()_, it may not be the cause of this issue.
However, it's interesting to note that I could also produce _oldExchange_ to be 
null more than once with _parallelAggregate()_, even with a thread safe 
aggregation strategy.
It may be an itch to scratch in another ticket.

  Back to this issue, I will propose a PR to log such exceptions in DEBUG level 
so one could have more details when facing such a situation.
We could make a great step forward if you could reproduce the behavior once 
while logging/unwinding throwables from your aggregation strategy.
Or you could also test against master with debug log level if the PR is 
accepted.

> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees 
> that this is only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared -by every instance of 
> {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be 
> {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition!
> Does this sounds reasonably?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-11-30 Thread Peter Keller (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15709764#comment-15709764
 ] 

Peter Keller commented on CAMEL-10272:
--

Alex, thank you very much for the analysis. 

You are right: if an exception is thrown then {{oldExchange}} may be {{null}}. 
As this may be the case in our route, it is mysterious why the exception is 
thrown in a non-deterministic way (of course the root cause may be another 
multi-threading issue not directly correlated to Camel).

Note, if {{parallelAggregate()}} is used (as mentioned above we don't use this 
setup), then {{oldExchange}} may also be null. This can be shown using a 
debugger with setting a breakpoint in {{doAggregateInternal}} on the line with 
{{ExchangeHelper.prepareAggregation}}. Shouldn't Camel prevent {{oldExchange}} 
to be {{null}} in this case?

The logging of the (possible) exception caused by the aggregation strategy 
could help to trace down the cause. Of course this could be done in the 
implementation of the aggregation strategy itself with a try/catch clause 
without changing the Camel framework. 

Unwinding the exception to the default error handler may break existing routes 
which is not desirable. That said, I personally would prefer this possibility.


> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees 
> that this is only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared -by every instance of 
> {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be 
> {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition!
> Does this sounds 

[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15694189#comment-15694189
 ] 

ASF GitHub Bot commented on CAMEL-10272:


Github user aldettinger closed the pull request at:

https://github.com/apache/camel/pull/1310


> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees 
> that this is only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared -by every instance of 
> {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be 
> {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition!
> Does this sounds reasonably?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-11-24 Thread Alex Dettinger (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15693983#comment-15693983
 ] 

Alex Dettinger commented on CAMEL-10272:


I don't think that we are facing such a simple race condition here. In the 
method _doAggregateInternal(...)_, _result.get(...)_ and _result.set(...)_ are 
not run concurrently.
A single thread is used at aggregation time, in the provided example at least.

However, _oldExchange_ could be null more than once when the user custom 
aggregation strategy throws a runtime exception:
{code}
public static class Router {
public String[] routeTo() {
return new String[] {"log:MSGROUTER_0?level=TRACE", 
"log:MSGROUTER_1?level=TRACE"};
}
}

@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start").
recipientList().
method(new Router()).
aggregationStrategy(new AggregationStrategy(){
public Exchange aggregate(Exchange oldExchange, Exchange 
newExchange) {
if (oldExchange == null) {
System.out.println("oldExchange is null 
"+newExchange.getExchangeId()+" thread: "+Thread.currentThread());
}
System.out.println(3/0); // throws 
java.lang.ArithmeticException
return null;
}
}).
parallelProcessing().
end();
}
};
}
{code}
outputs:
{noformat}
oldExchange is null ID-alex-42036-1479994398470-0-3 thread: Thread[Camel 
(camel-1) thread #2 - RecipientList-AggregateTask,5,main]
oldExchange is null ID-alex-42036-1479994398470-0-4 thread: Thread[Camel 
(camel-1) thread #2 - RecipientList-AggregateTask,5,main]
{noformat}
That said, I see the following drawbacks with the current implementation:
* The AggregationStrategy class javadoc lists a single case where oldExchange 
could be null whereas two exists
* This case could be difficult to debug from a camel user perspective. Camel 
kind of hides the runtime exception in the custom aggregation strategy.

>From there, I see 2 paths to handle a runtime exception unwind from a custom 
>aggregation strategy:
* Produce an ERROR log. We then need to correct the javadoc accordingly.
* Unwind the exception to the default error handler. Note that this is what 
happen when parallelProcessing is false.

Any thoughts on the right path then ?

> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> 

[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15693955#comment-15693955
 ] 

ASF GitHub Bot commented on CAMEL-10272:


GitHub user aldettinger opened a pull request:

https://github.com/apache/camel/pull/1310

CAMEL 10272: Fixed few comments

Note that this PR does not fix 
[CAMEL-10272](https://issues.apache.org/jira/browse/CAMEL-10272). I'll post on 
the issue tracker directly to details my findings.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aldettinger/camel master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/camel/pull/1310.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1310


commit 42efb4ef220beb26e8a49c7e729c81d3e0054a63
Author: aldettinger 
Date:   2016-11-24T18:00:02Z

CAMEL 10272: Fixed few comments




> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees 
> that this is only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared -by every instance of 
> {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be 
> {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition!
> Does this sounds reasonably?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-09-01 Thread Peter Keller (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15455190#comment-15455190
 ] 

Peter Keller commented on CAMEL-10272:
--

I updated the analysis: There is only one instance of {{AggregateOnTheFlyTask}}.

> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
> Environment: MacOS 10.11.6, JRE 1.7.0_79
>Reporter: Peter Keller
>Priority: Critical
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> deterministic due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked -*asynchronously* via {{run()}}  for 
> *every* target in the recipient list-. via 
> {{aggregateExecutorService.submit}} ({{aggregationTaskSubmitted}} guarantees 
> that this is only be done once)
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked synchronously:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared -by every instance of 
> {{AggregateOnTheFlyTask}}- such that {{oldExchange = result.get()}} may be 
> {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition!
> Does this sounds reasonably?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (CAMEL-10272) Aggregation is broken due to race condition in ParallelAggregateTask.doAggregateInternal()

2016-08-29 Thread Ushnash Shukla (JIRA)

[ 
https://issues.apache.org/jira/browse/CAMEL-10272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15446658#comment-15446658
 ] 

Ushnash Shukla commented on CAMEL-10272:


I'll take a look at this.

> Aggregation is broken due to race condition in 
> ParallelAggregateTask.doAggregateInternal()
> --
>
> Key: CAMEL-10272
> URL: https://issues.apache.org/jira/browse/CAMEL-10272
> Project: Camel
>  Issue Type: Bug
>  Components: camel-core
>Affects Versions: 2.16.3, 2.17.3
>Reporter: Peter Keller
>Priority: Critical
>
> Unfortunately, I am not able to provide a (simple) unit test for 
> comprehending the problem. Furthermore our (complex) unit tests are not 
> determinist due to the root cause of the problem.
> However I tried to analyze the Camel Java code, to work out the problem. 
> Please find below my findings.
> h3. Problem
> The {{oldExchange}} is {{null}} more than once in the aggregator if a 
> recipient list is processed in parallel.
> h3. Camel route
> In my Camel route, a recipient list is worked of in parallel:
> {code}
>  from("direct:start")
> .to("direct:pre")
> .recipientList().method(new MyRecipientListBuilder())
> .stopOnException()
> .aggregationStrategy(new MyAggregationStrategy())
> .parallelProcessing()
> .end()
> .bean(new MyPostProcessor());
> {code}
> Snippet of {{MyAggregationStrategy}}:
> {code}
> @Override
> @SuppressWarnings("unchecked")
> public Exchange aggregate(final Exchange oldExchange, final Exchange 
> newExchange) {
> if (oldExchange == null) {
> // this is the case more than once which is not expected!
> }
> // ...
> {code}
> {{oldExchange}} is null more than once which is not expected and which 
> contradicts the contract with Camel.
> h3. Analysis
> During the processing, Camel invokes {{MulticastProcessor.process()}}. Here 
> the result object {{AtomicExchange}} is created which is shared during the 
> whole processing.
> If the processing should be done in parallel (as it is the case for our 
> route) then {{MulticastProcessor.doProcessParallel()}} is invoked. Here one 
> instance of {{AggregateOnTheFlyTask}} is initialized and 
> {{aggregateOnTheFly()}} is invoked *asynchronously* via {{run()}}  for 
> *every* target in the recipient list.
> In {{aggregateOnTheFly()}}, a new instance of {{ParallelAggregateTask}} is 
> generated, and if aggregation is not done in parallel (as it is the case in 
> our route), {{ParallelAggregateTask.run()}}, 
> {{ParallelAggregateTask.doAggregate()}} (this method is synchronized), and 
> {{ParallelAggregateTask.doAggregateInternal()}} is invoked:
> {code}
> protected void doAggregateInternal(AggregationStrategy strategy, 
> AtomicExchange result, Exchange exchange) {
> if (strategy != null) {
> // prepare the exchanges for aggregation
> Exchange oldExchange = result.get();
> ExchangeHelper.prepareAggregation(oldExchange, exchange);
> result.set(strategy.aggregate(oldExchange, exchange));
> }
> } 
> {code}
> However, in {{ParallelAggregateTask.doAggregateInternal()}} there may occur a 
> race condition as {{result}} is shared by every instance of 
> {{AggregateOnTheFlyTask}} such that {{oldExchange = result.get()}} 
> may be {{null}} more than once!
> Note: As a new instance of {{ParallelAggregateTask}} for every target in 
> recipient list is created, the {{synchronized}} method 
> {{ParallelAggregateTask.doAggregate()}} does not prevent the race condition!
> Does this sounds reasonably?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)