[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator

2019-08-15 Thread michael elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908423#comment-16908423
 ] 

michael elbaz commented on CAMEL-12906:
---

There is someone have any idea about ? because it cause losing of messages

> Strange comportement with aggregator
> 
>
> Key: CAMEL-12906
> URL: https://issues.apache.org/jira/browse/CAMEL-12906
> Project: Camel
>  Issue Type: Bug
>  Components: camel-activemq
>Affects Versions: 2.22.1
>Reporter: michael elbaz
>Priority: Minor
>
> /!\ Its about camel aggregator and error handling during *shutdown phase*
> My case is about graceful shutdown and error handling in case of aggregator 
> first one:
> In this example i will get *RejectedExecutionException* during the shutdown 
> and the exception is not catched by the *onexception* or *errorHandler* 
> methods as expected so i will just lost message.
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code}
> Stacktrace:
> {code:java}
> java.util.concurrent.RejectedExecutionException: null
>   at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_131]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_131]
>   at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
> {code}
> In this one (+using doTry before aggregate+) there is no exception throwed, 
> that the interesting point because i expect at least catched 
> *RejectedExecutionException* and finally i *didn't lost* message in this case 
> !
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
>// errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .doTry() // Strange /!\
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
>

[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator

2019-08-01 Thread michael elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897868#comment-16897868
 ] 

michael elbaz commented on CAMEL-12906:
---

Hi [~davsclaus] could you provide me a link or basic example of this please ?

> Strange comportement with aggregator
> 
>
> Key: CAMEL-12906
> URL: https://issues.apache.org/jira/browse/CAMEL-12906
> Project: Camel
>  Issue Type: Bug
>  Components: camel-activemq
>Affects Versions: 2.22.1
>Reporter: michael elbaz
>Priority: Major
>
> /!\ Its about camel aggregator and error handling during *shutdown phase*
> My case is about graceful shutdown and error handling in case of aggregator 
> first one:
> In this example i will get *RejectedExecutionException* during the shutdown 
> and the exception is not catched by the *onexception* or *errorHandler* 
> methods as expected so i will just lost message.
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code}
> Stacktrace:
> {code:java}
> java.util.concurrent.RejectedExecutionException: null
>   at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_131]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_131]
>   at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
> {code}
> In this one (+using doTry before aggregate+) there is no exception throwed, 
> that the interesting point because i expect at least catched 
> *RejectedExecutionException* and finally i *didn't lost* message in this case 
> !
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
>// errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .doTry() // Strange /!\
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true"

[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator

2019-07-31 Thread Claus Ibsen (JIRA)


[ 
https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897801#comment-16897801
 ] 

Claus Ibsen commented on CAMEL-12906:
-

You need in spring to set those spring beans for your databases etc to have 
depends-on camel-context so they are shutdown after that.

> Strange comportement with aggregator
> 
>
> Key: CAMEL-12906
> URL: https://issues.apache.org/jira/browse/CAMEL-12906
> Project: Camel
>  Issue Type: Bug
>  Components: camel-activemq
>Affects Versions: 2.22.1
>Reporter: michael elbaz
>Priority: Major
>
> /!\ Its about camel aggregator and error handling during *shutdown phase*
> My case is about graceful shutdown and error handling in case of aggregator 
> first one:
> In this example i will get *RejectedExecutionException* during the shutdown 
> and the exception is not catched by the *onexception* or *errorHandler* 
> methods as expected so i will just lost message.
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code}
> Stacktrace:
> {code:java}
> java.util.concurrent.RejectedExecutionException: null
>   at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_131]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_131]
>   at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
> {code}
> In this one (+using doTry before aggregate+) there is no exception throwed, 
> that the interesting point because i expect at least catched 
> *RejectedExecutionException* and finally i *didn't lost* message in this case 
> !
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
>// errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .doTry() // Strange /!\
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq =

[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator

2019-07-22 Thread michael elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890009#comment-16890009
 ] 

michael elbaz commented on CAMEL-12906:
---

[~dmvolod] What about this point ?

> Strange comportement with aggregator
> 
>
> Key: CAMEL-12906
> URL: https://issues.apache.org/jira/browse/CAMEL-12906
> Project: Camel
>  Issue Type: Bug
>  Components: camel-activemq
>Affects Versions: 2.22.1
>Reporter: michael elbaz
>Priority: Major
>
> /!\ Its about camel aggregator and error handling during *shutdown phase*
> My case is about graceful shutdown and error handling in case of aggregator 
> first one:
> In this example i will get *RejectedExecutionException* during the shutdown 
> and the exception is not catched by the *onexception* or *errorHandler* 
> methods as expected so i will just lost message.
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code}
> Stacktrace:
> {code:java}
> java.util.concurrent.RejectedExecutionException: null
>   at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_131]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_131]
>   at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
> {code}
> In this one (+using doTry before aggregate+) there is no exception throwed, 
> that the interesting point because i expect at least catched 
> *RejectedExecutionException* and finally i *didn't lost* message in this case 
> !
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
>// errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .doTry() // Strange /!\
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code

[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator

2018-10-30 Thread michael elbaz (JIRA)


[ 
https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668398#comment-16668398
 ] 

michael elbaz commented on CAMEL-12906:
---

Yeah it's seems in my opinion there is something unclear/wrong with the 
shutdown, and also when using Camel with spring the spring context also 
shutdown at the same time for example spring container will close the 
datasource Immediately so I loose my message if I'm trying to write in database

> Strange comportement with aggregator
> 
>
> Key: CAMEL-12906
> URL: https://issues.apache.org/jira/browse/CAMEL-12906
> Project: Camel
>  Issue Type: Bug
>  Components: camel-activemq
>Affects Versions: 2.22.1
>Reporter: michael elbaz
>Priority: Major
>
> /!\ Its about camel aggregator and error handling during *shutdown phase*
> My case is about graceful shutdown and error handling in case of aggregator 
> first one:
> In this example i will get *RejectedExecutionException* during the shutdown 
> and the exception is not catched by the *onexception* or *errorHandler* 
> methods as expected so i will just lost message.
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code}
> Stacktrace:
> {code:java}
> java.util.concurrent.RejectedExecutionException: null
>   at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_131]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_131]
>   at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
> {code}
> In this one (+using doTry before aggregate+) there is no exception throwed, 
> that the interesting point because i expect at least catched 
> *RejectedExecutionException* and finally i *didn't lost* message in this case 
> !
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
>// errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .doTry() // Strange /!\
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> Sys

[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator

2018-10-29 Thread Dmitry Volodin (JIRA)


[ 
https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1953#comment-1953
 ] 

Dmitry Volodin commented on CAMEL-12906:


[~michael992], CAMEL-12668 can be related to your issue, please look at this.

> Strange comportement with aggregator
> 
>
> Key: CAMEL-12906
> URL: https://issues.apache.org/jira/browse/CAMEL-12906
> Project: Camel
>  Issue Type: Bug
>  Components: camel-activemq
>Affects Versions: 2.22.1
>Reporter: michael elbaz
>Priority: Major
>
> /!\ Its about camel aggregator and error handling during *shutdown phase*
> My case is about graceful shutdown and error handling in case of aggregator 
> first one:
> In this example i will get *RejectedExecutionException* during the shutdown 
> and the exception is not catched by the *onexception* or *errorHandler* 
> methods as expected so i will just lost message.
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
> errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=true";
> return amq;
> }
> }
> {code}
> Stacktrace:
> {code:java}
> java.util.concurrent.RejectedExecutionException: null
>   at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791)
>  ~[camel-core-2.22.1.jar:2.22.1]
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_131]
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
> [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_131]
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_131]
>   at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
> {code}
> In this one (+using doTry before aggregate+) there is no exception throwed, 
> that the interesting point because i expect at least catched 
> *RejectedExecutionException* and finally i *didn't lost* message in this case 
> !
> {code:java}
> @Component
> public class AmqRoute extends RouteBuilder {
> @Override
> public void configure() throws Exception {
>// errors();
> from("timer:foo?period=1000")
> .log("1")
> .transform().body(() -> "DATA " + 
> RandomStringUtils.randomAlphanumeric(10))
> .convertBodyTo(String.class)
> .to(amq());
> from(amq()).to("direct:foo");
> from("direct:foo")
> .delay(3000)
> .doTry() // Strange /!\
> .aggregate(constant(true), new 
> GroupedBodyAggregationStrategy())
> .completionSize(10)
> .forceCompletionOnStop()
> .log("${body}");
> }
> public void errors() {
> onException(Exception.class)
> .useOriginalMessage()
> .to("activemq:recovery")
> .handled(true)
> .onRedelivery(exchange -> System.err.println("push to amq"));
> errorHandler(deadLetterChannel("log:dead?level=ERROR"));
> }
> private static String amq() {
> String amq = "activemq:data";
> amq += "?transacted=tru