I'm in a scenario where I have one input subscription to a message queue. It's prohibitively expensive to subscribe twice, but I would like to take two separate actions for the messages I receive. I'm using multicast to do so, but I would also like to aggregate on one of those routes. Multicasting to two direct routes and then aggregating on one works just fine until it comes time to acknowledge the message from the input subscription.
I should only acknowledge the message once both downstream actions are completed. Both are idempotent so redelivery isn't a problem, but dropping data due to an early acknowledgment followed by process failure is a problem. My problem is I can't find a simple way to synchronize on the completed state of the first direct route and the completed processing after aggregating the second direct route. In other applications we use end() after multicast() to synchronize on the completions of downstream routes, and that does work. I'm attaching three minimal examples demonstrating the behavior I've described: The first example is set up as my application is set up. Ideally I can get this example to pass its assertion without too much extra configuration. The second example is set up as our other applications are set up, and the acknowledgment after multicast correctly waits until all downstream processing is complete. The third example is the only way I've found to achieve proper synchronization, but it involves tracking completion state and stitching messages back together and aggregating until I've received each one back as many times as I've multicasted it. It does work, but it is ugly. My question is: Is there a way to acknowledge from the end of the multicast declaration even when using aggregation? Or is the third example the best way to ensure I don't acknowledge messages too early? This is tested with Camel 4.10.0. Thanks, Andrew Examples: package com.example; import static com.google.common.truth.Truth.assertThat; import java.util.Objects; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.camel.CamelContext; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.processor.aggregate.StringAggregationStrategy; import org.junit.jupiter.api.Test; public class CamelAggregationExamples { private final LinkedBlockingQueue<String> resultQueue = new LinkedBlockingQueue<>(); @Test public void test_naive_ack_after_multicast_with_aggregation() throws Exception { CamelContext context = new DefaultCamelContext(); context.addRoutes( new RouteBuilder() { @Override public void configure() { from("direct:start") .log("Receiving message: ${body}") .multicast() .to("direct:a", "direct:b") .end() .log("multicast complete") .process(e -> resultQueue.add("ack message; multicast complete: " + e.getIn().getBody())); from("direct:a") .log("Receiving by a: ${body}") .process(e -> { TimeUnit.SECONDS.sleep(1); resultQueue.add("direct route complete: " + e.getIn().getBody()); }); from("direct:b") .aggregate(constant(true), new StringAggregationStrategy().delimiter("; ")) .completionSize(2) .log("Aggregated body: ${body}") .process(e -> { TimeUnit.SECONDS.sleep(4); resultQueue.add("aggregation processing complete: " + e.getIn().getBody()); }); } } ); context.start(); context.createProducerTemplate().sendBody("direct:start", "Hello!"); context.createProducerTemplate().sendBody("direct:start", "World!"); TimeUnit.SECONDS.sleep(10); // actual: [ // direct route complete: Hello!, // ack message; multicast complete: Hello!, // direct route complete: World!, // ack message; multicast complete: World!, // aggregation processing complete: Hello!; World! // ] assertThat(resultQueue).containsExactly( "direct route complete: Hello!", "direct route complete: World!", "aggregation processing complete: Hello!; World!", "ack message; multicast complete: Hello!", "ack message; multicast complete: World!" ).inOrder(); } @Test public void test_naive_ack_without_aggregation() throws Exception { CamelContext context = new DefaultCamelContext(); context.addRoutes( new RouteBuilder() { @Override public void configure() { from("direct:start") .log("Receiving message: ${body}") .multicast() .to("direct:a", "direct:b") .end() .log("multicast complete") .process(e -> resultQueue.add("ack message; multicast complete: " + e.getIn().getBody())); from("direct:a") .log("Receiving by a: ${body}") .process(e -> { TimeUnit.SECONDS.sleep(1); resultQueue.add("direct route a complete: " + e.getIn().getBody()); }); from("direct:b") .process(e -> { TimeUnit.SECONDS.sleep(4); resultQueue.add("direct route b complete: " + e.getIn().getBody()); }); } } ); context.start(); context.createProducerTemplate().sendBody("direct:start", "Hello!"); context.createProducerTemplate().sendBody("direct:start", "World!"); TimeUnit.SECONDS.sleep(10); // this is correct assertThat(resultQueue).containsExactly( "direct route a complete: Hello!", "direct route b complete: Hello!", "ack message; multicast complete: Hello!", "direct route a complete: World!", "direct route b complete: World!", "ack message; multicast complete: World!" ).inOrder(); } @Test public void test_complex_ack_after_multicast_aggregation() throws Exception { CamelContext context = new DefaultCamelContext(); context.addRoutes( new RouteBuilder() { @Override public void configure() { from("direct:start") .log("Receiving message: ${body}") .multicast() .to("direct:a", "direct:b") .end() .log("multicast complete"); from("direct:a") .log("Receiving by a: ${body}") .process(e -> { TimeUnit.SECONDS.sleep(1); resultQueue.add("direct route complete: " + e.getIn().getBody()); }) .to("direct:ack"); from("direct:b") .aggregate(constant(true), new StringAggregationStrategy().delimiter("; ")) .completionSize(2) .log("Aggregated body: ${body}") .process(e -> { TimeUnit.SECONDS.sleep(4); resultQueue.add("aggregation processing complete: " + e.getIn().getBody()); }) .split(body(), "; ") .to("direct:ack"); from("direct:ack") .aggregate(body(), Objects::requireNonNullElse) .completionSize(2) .process(e -> { resultQueue.add("ack due to all completions: " + e.getIn().getBody()); }); } } ); context.start(); context.createProducerTemplate().sendBody("direct:start", "Hello!"); context.createProducerTemplate().sendBody("direct:start", "World!"); TimeUnit.SECONDS.sleep(10); assertThat(resultQueue).containsExactly( "direct route complete: Hello!", "direct route complete: World!", "aggregation processing complete: Hello!; World!", "ack due to all completions: Hello!", "ack due to all completions: World!" ).inOrder(); } }