Hi Multicast has aggregation strategy where you can combine the 2 responses together as you desire, and then the output of that is what is continued routed after the multicast block
On Fri, Feb 14, 2025 at 4:42 AM Andrew Parmet <andrew.par...@gmail.com> wrote: > I'm in a scenario where I have one input subscription to a message queue. > It's prohibitively expensive to subscribe twice, but I would like to take > two separate actions for the messages I receive. I'm using multicast to do > so, but I would also like to aggregate on one of those routes. Multicasting > to two direct routes and then aggregating on one works just fine until it > comes time to acknowledge the message from the input subscription. > > I should only acknowledge the message once both downstream actions are > completed. Both are idempotent so redelivery isn't a problem, but dropping > data due to an early acknowledgment followed by process failure is a > problem. > > My problem is I can't find a simple way to synchronize on the completed > state of the first direct route and the completed processing after > aggregating the second direct route. In other applications we use end() > after multicast() to synchronize on the completions of downstream routes, > and that does work. > > I'm attaching three minimal examples demonstrating the behavior I've > described: > > The first example is set up as my application is set up. Ideally I can get > this example to pass its assertion without too much extra configuration. > > The second example is set up as our other applications are set up, and the > acknowledgment after multicast correctly waits until all downstream > processing is complete. > > The third example is the only way I've found to achieve proper > synchronization, but it involves tracking completion state and stitching > messages back together and aggregating until I've received each one back as > many times as I've multicasted it. It does work, but it is ugly. > > My question is: Is there a way to acknowledge from the end of the multicast > declaration even when using aggregation? Or is the third example the best > way to ensure I don't acknowledge messages too early? > > This is tested with Camel 4.10.0. > > Thanks, > Andrew > > Examples: > > package com.example; > > import static com.google.common.truth.Truth.assertThat; > > import java.util.Objects; > import java.util.concurrent.LinkedBlockingQueue; > import java.util.concurrent.TimeUnit; > import org.apache.camel.CamelContext; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.impl.DefaultCamelContext; > import org.apache.camel.processor.aggregate.StringAggregationStrategy; > import org.junit.jupiter.api.Test; > > public class CamelAggregationExamples { > private final LinkedBlockingQueue<String> resultQueue = new > LinkedBlockingQueue<>(); > > @Test > public void test_naive_ack_after_multicast_with_aggregation() throws > Exception { > CamelContext context = new DefaultCamelContext(); > context.addRoutes( > new RouteBuilder() { > @Override > public void configure() { > from("direct:start") > .log("Receiving message: ${body}") > .multicast() > .to("direct:a", "direct:b") > .end() > .log("multicast complete") > .process(e -> resultQueue.add("ack message; > multicast complete: " + e.getIn().getBody())); > > from("direct:a") > .log("Receiving by a: ${body}") > .process(e -> { > TimeUnit.SECONDS.sleep(1); > resultQueue.add("direct route complete: " + > e.getIn().getBody()); > }); > > from("direct:b") > .aggregate(constant(true), new > StringAggregationStrategy().delimiter("; ")) > .completionSize(2) > .log("Aggregated body: ${body}") > .process(e -> { > TimeUnit.SECONDS.sleep(4); > resultQueue.add("aggregation processing > complete: " + e.getIn().getBody()); > }); > } > } > ); > context.start(); > > context.createProducerTemplate().sendBody("direct:start", > "Hello!"); > context.createProducerTemplate().sendBody("direct:start", > "World!"); > > TimeUnit.SECONDS.sleep(10); > > // actual: [ > // direct route complete: Hello!, > // ack message; multicast complete: Hello!, > // direct route complete: World!, > // ack message; multicast complete: World!, > // aggregation processing complete: Hello!; World! > // ] > assertThat(resultQueue).containsExactly( > "direct route complete: Hello!", > "direct route complete: World!", > "aggregation processing complete: Hello!; World!", > "ack message; multicast complete: Hello!", > "ack message; multicast complete: World!" > ).inOrder(); > } > > @Test > public void test_naive_ack_without_aggregation() throws Exception { > CamelContext context = new DefaultCamelContext(); > context.addRoutes( > new RouteBuilder() { > @Override > public void configure() { > from("direct:start") > .log("Receiving message: ${body}") > .multicast() > .to("direct:a", "direct:b") > .end() > .log("multicast complete") > .process(e -> resultQueue.add("ack message; > multicast complete: " + e.getIn().getBody())); > > from("direct:a") > .log("Receiving by a: ${body}") > .process(e -> { > TimeUnit.SECONDS.sleep(1); > resultQueue.add("direct route a complete: " + > e.getIn().getBody()); > }); > > from("direct:b") > .process(e -> { > TimeUnit.SECONDS.sleep(4); > resultQueue.add("direct route b complete: " + > e.getIn().getBody()); > }); > } > } > ); > context.start(); > > context.createProducerTemplate().sendBody("direct:start", > "Hello!"); > context.createProducerTemplate().sendBody("direct:start", > "World!"); > > TimeUnit.SECONDS.sleep(10); > > // this is correct > assertThat(resultQueue).containsExactly( > "direct route a complete: Hello!", > "direct route b complete: Hello!", > "ack message; multicast complete: Hello!", > "direct route a complete: World!", > "direct route b complete: World!", > "ack message; multicast complete: World!" > ).inOrder(); > } > > @Test > public void test_complex_ack_after_multicast_aggregation() throws > Exception { > CamelContext context = new DefaultCamelContext(); > context.addRoutes( > new RouteBuilder() { > @Override > public void configure() { > from("direct:start") > .log("Receiving message: ${body}") > .multicast() > .to("direct:a", "direct:b") > .end() > .log("multicast complete"); > > from("direct:a") > .log("Receiving by a: ${body}") > .process(e -> { > TimeUnit.SECONDS.sleep(1); > resultQueue.add("direct route complete: " + > e.getIn().getBody()); > }) > .to("direct:ack"); > > from("direct:b") > .aggregate(constant(true), new > StringAggregationStrategy().delimiter("; ")) > .completionSize(2) > .log("Aggregated body: ${body}") > .process(e -> { > TimeUnit.SECONDS.sleep(4); > resultQueue.add("aggregation processing > complete: " + e.getIn().getBody()); > }) > .split(body(), "; ") > .to("direct:ack"); > > from("direct:ack") > .aggregate(body(), Objects::requireNonNullElse) > .completionSize(2) > .process(e -> { > resultQueue.add("ack due to all completions: " > + e.getIn().getBody()); > }); > } > } > ); > context.start(); > > context.createProducerTemplate().sendBody("direct:start", > "Hello!"); > context.createProducerTemplate().sendBody("direct:start", > "World!"); > > TimeUnit.SECONDS.sleep(10); > > assertThat(resultQueue).containsExactly( > "direct route complete: Hello!", > "direct route complete: World!", > "aggregation processing complete: Hello!; World!", > "ack due to all completions: Hello!", > "ack due to all completions: World!" > ).inOrder(); > } > } > -- Claus Ibsen ----------------- @davsclaus Camel in Action 2: https://www.manning.com/ibsen2