[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator
[ https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16908423#comment-16908423 ] michael elbaz commented on CAMEL-12906: --- There is someone have any idea about ? because it cause losing of messages > Strange comportement with aggregator > > > Key: CAMEL-12906 > URL: https://issues.apache.org/jira/browse/CAMEL-12906 > Project: Camel > Issue Type: Bug > Components: camel-activemq >Affects Versions: 2.22.1 >Reporter: michael elbaz >Priority: Minor > > /!\ Its about camel aggregator and error handling during *shutdown phase* > My case is about graceful shutdown and error handling in case of aggregator > first one: > In this example i will get *RejectedExecutionException* during the shutdown > and the exception is not catched by the *onexception* or *errorHandler* > methods as expected so i will just lost message. > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { > errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code} > Stacktrace: > {code:java} > java.util.concurrent.RejectedExecutionException: null > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) > ~[camel-core-2.22.1.jar:2.22.1] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_131] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_131] > at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] > {code} > In this one (+using doTry before aggregate+) there is no exception throwed, > that the interesting point because i expect at least catched > *RejectedExecutionException* and finally i *didn't lost* message in this case > ! > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { >// errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .doTry() // Strange /!\ > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; >
[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator
[ https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897868#comment-16897868 ] michael elbaz commented on CAMEL-12906: --- Hi [~davsclaus] could you provide me a link or basic example of this please ? > Strange comportement with aggregator > > > Key: CAMEL-12906 > URL: https://issues.apache.org/jira/browse/CAMEL-12906 > Project: Camel > Issue Type: Bug > Components: camel-activemq >Affects Versions: 2.22.1 >Reporter: michael elbaz >Priority: Major > > /!\ Its about camel aggregator and error handling during *shutdown phase* > My case is about graceful shutdown and error handling in case of aggregator > first one: > In this example i will get *RejectedExecutionException* during the shutdown > and the exception is not catched by the *onexception* or *errorHandler* > methods as expected so i will just lost message. > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { > errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code} > Stacktrace: > {code:java} > java.util.concurrent.RejectedExecutionException: null > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) > ~[camel-core-2.22.1.jar:2.22.1] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_131] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_131] > at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] > {code} > In this one (+using doTry before aggregate+) there is no exception throwed, > that the interesting point because i expect at least catched > *RejectedExecutionException* and finally i *didn't lost* message in this case > ! > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { >// errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .doTry() // Strange /!\ > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"
[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator
[ https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16897801#comment-16897801 ] Claus Ibsen commented on CAMEL-12906: - You need in spring to set those spring beans for your databases etc to have depends-on camel-context so they are shutdown after that. > Strange comportement with aggregator > > > Key: CAMEL-12906 > URL: https://issues.apache.org/jira/browse/CAMEL-12906 > Project: Camel > Issue Type: Bug > Components: camel-activemq >Affects Versions: 2.22.1 >Reporter: michael elbaz >Priority: Major > > /!\ Its about camel aggregator and error handling during *shutdown phase* > My case is about graceful shutdown and error handling in case of aggregator > first one: > In this example i will get *RejectedExecutionException* during the shutdown > and the exception is not catched by the *onexception* or *errorHandler* > methods as expected so i will just lost message. > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { > errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code} > Stacktrace: > {code:java} > java.util.concurrent.RejectedExecutionException: null > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) > ~[camel-core-2.22.1.jar:2.22.1] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_131] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_131] > at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] > {code} > In this one (+using doTry before aggregate+) there is no exception throwed, > that the interesting point because i expect at least catched > *RejectedExecutionException* and finally i *didn't lost* message in this case > ! > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { >// errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .doTry() // Strange /!\ > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq =
[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator
[ https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16890009#comment-16890009 ] michael elbaz commented on CAMEL-12906: --- [~dmvolod] What about this point ? > Strange comportement with aggregator > > > Key: CAMEL-12906 > URL: https://issues.apache.org/jira/browse/CAMEL-12906 > Project: Camel > Issue Type: Bug > Components: camel-activemq >Affects Versions: 2.22.1 >Reporter: michael elbaz >Priority: Major > > /!\ Its about camel aggregator and error handling during *shutdown phase* > My case is about graceful shutdown and error handling in case of aggregator > first one: > In this example i will get *RejectedExecutionException* during the shutdown > and the exception is not catched by the *onexception* or *errorHandler* > methods as expected so i will just lost message. > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { > errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code} > Stacktrace: > {code:java} > java.util.concurrent.RejectedExecutionException: null > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) > ~[camel-core-2.22.1.jar:2.22.1] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_131] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_131] > at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] > {code} > In this one (+using doTry before aggregate+) there is no exception throwed, > that the interesting point because i expect at least catched > *RejectedExecutionException* and finally i *didn't lost* message in this case > ! > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { >// errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .doTry() // Strange /!\ > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code
[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator
[ https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668398#comment-16668398 ] michael elbaz commented on CAMEL-12906: --- Yeah it's seems in my opinion there is something unclear/wrong with the shutdown, and also when using Camel with spring the spring context also shutdown at the same time for example spring container will close the datasource Immediately so I loose my message if I'm trying to write in database > Strange comportement with aggregator > > > Key: CAMEL-12906 > URL: https://issues.apache.org/jira/browse/CAMEL-12906 > Project: Camel > Issue Type: Bug > Components: camel-activemq >Affects Versions: 2.22.1 >Reporter: michael elbaz >Priority: Major > > /!\ Its about camel aggregator and error handling during *shutdown phase* > My case is about graceful shutdown and error handling in case of aggregator > first one: > In this example i will get *RejectedExecutionException* during the shutdown > and the exception is not catched by the *onexception* or *errorHandler* > methods as expected so i will just lost message. > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { > errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code} > Stacktrace: > {code:java} > java.util.concurrent.RejectedExecutionException: null > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) > ~[camel-core-2.22.1.jar:2.22.1] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_131] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_131] > at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] > {code} > In this one (+using doTry before aggregate+) there is no exception throwed, > that the interesting point because i expect at least catched > *RejectedExecutionException* and finally i *didn't lost* message in this case > ! > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { >// errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .doTry() // Strange /!\ > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> Sys
[jira] [Commented] (CAMEL-12906) Strange comportement with aggregator
[ https://issues.apache.org/jira/browse/CAMEL-12906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1953#comment-1953 ] Dmitry Volodin commented on CAMEL-12906: [~michael992], CAMEL-12668 can be related to your issue, please look at this. > Strange comportement with aggregator > > > Key: CAMEL-12906 > URL: https://issues.apache.org/jira/browse/CAMEL-12906 > Project: Camel > Issue Type: Bug > Components: camel-activemq >Affects Versions: 2.22.1 >Reporter: michael elbaz >Priority: Major > > /!\ Its about camel aggregator and error handling during *shutdown phase* > My case is about graceful shutdown and error handling in case of aggregator > first one: > In this example i will get *RejectedExecutionException* during the shutdown > and the exception is not catched by the *onexception* or *errorHandler* > methods as expected so i will just lost message. > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { > errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=true"; > return amq; > } > } > {code} > Stacktrace: > {code:java} > java.util.concurrent.RejectedExecutionException: null > at > org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:435) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97) > ~[camel-core-2.22.1.jar:2.22.1] > at > org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:791) > ~[camel-core-2.22.1.jar:2.22.1] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_131] > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_131] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_131] > at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] > {code} > In this one (+using doTry before aggregate+) there is no exception throwed, > that the interesting point because i expect at least catched > *RejectedExecutionException* and finally i *didn't lost* message in this case > ! > {code:java} > @Component > public class AmqRoute extends RouteBuilder { > @Override > public void configure() throws Exception { >// errors(); > from("timer:foo?period=1000") > .log("1") > .transform().body(() -> "DATA " + > RandomStringUtils.randomAlphanumeric(10)) > .convertBodyTo(String.class) > .to(amq()); > from(amq()).to("direct:foo"); > from("direct:foo") > .delay(3000) > .doTry() // Strange /!\ > .aggregate(constant(true), new > GroupedBodyAggregationStrategy()) > .completionSize(10) > .forceCompletionOnStop() > .log("${body}"); > } > public void errors() { > onException(Exception.class) > .useOriginalMessage() > .to("activemq:recovery") > .handled(true) > .onRedelivery(exchange -> System.err.println("push to amq")); > errorHandler(deadLetterChannel("log:dead?level=ERROR")); > } > private static String amq() { > String amq = "activemq:data"; > amq += "?transacted=tru