Hi

Multicast has aggregation strategy where you can combine the 2 responses
together as you desire, and then the output of that is what is
continued routed after the multicast block



On Fri, Feb 14, 2025 at 4:42 AM Andrew Parmet <andrew.par...@gmail.com>
wrote:

> I'm in a scenario where I have one input subscription to a message queue.
> It's prohibitively expensive to subscribe twice, but I would like to take
> two separate actions for the messages I receive. I'm using multicast to do
> so, but I would also like to aggregate on one of those routes. Multicasting
> to two direct routes and then aggregating on one works just fine until it
> comes time to acknowledge the message from the input subscription.
>
> I should only acknowledge the message once both downstream actions are
> completed. Both are idempotent so redelivery isn't a problem, but dropping
> data due to an early acknowledgment followed by process failure is a
> problem.
>
> My problem is I can't find a simple way to synchronize on the completed
> state of the first direct route and the completed processing after
> aggregating the second direct route. In other applications we use end()
> after multicast() to synchronize on the completions of downstream routes,
> and that does work.
>
> I'm attaching three minimal examples demonstrating the behavior I've
> described:
>
> The first example is set up as my application is set up. Ideally I can get
> this example to pass its assertion without too much extra configuration.
>
> The second example is set up as our other applications are set up, and the
> acknowledgment after multicast correctly waits until all downstream
> processing is complete.
>
> The third example is the only way I've found to achieve proper
> synchronization, but it involves tracking completion state and stitching
> messages back together and aggregating until I've received each one back as
> many times as I've multicasted it. It does work, but it is ugly.
>
> My question is: Is there a way to acknowledge from the end of the multicast
> declaration even when using aggregation? Or is the third example the best
> way to ensure I don't acknowledge messages too early?
>
> This is tested with Camel 4.10.0.
>
> Thanks,
> Andrew
>
> Examples:
>
> package com.example;
>
> import static com.google.common.truth.Truth.assertThat;
>
> import java.util.Objects;
> import java.util.concurrent.LinkedBlockingQueue;
> import java.util.concurrent.TimeUnit;
> import org.apache.camel.CamelContext;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.processor.aggregate.StringAggregationStrategy;
> import org.junit.jupiter.api.Test;
>
> public class CamelAggregationExamples {
>     private final LinkedBlockingQueue<String> resultQueue = new
> LinkedBlockingQueue<>();
>
>     @Test
>     public void test_naive_ack_after_multicast_with_aggregation() throws
> Exception {
>         CamelContext context = new DefaultCamelContext();
>         context.addRoutes(
>             new RouteBuilder() {
>                 @Override
>                 public void configure() {
>                     from("direct:start")
>                         .log("Receiving message: ${body}")
>                         .multicast()
>                         .to("direct:a", "direct:b")
>                         .end()
>                         .log("multicast complete")
>                         .process(e -> resultQueue.add("ack message;
> multicast complete: " + e.getIn().getBody()));
>
>                     from("direct:a")
>                         .log("Receiving by a: ${body}")
>                         .process(e -> {
>                             TimeUnit.SECONDS.sleep(1);
>                             resultQueue.add("direct route complete: " +
> e.getIn().getBody());
>                         });
>
>                     from("direct:b")
>                         .aggregate(constant(true), new
> StringAggregationStrategy().delimiter("; "))
>                         .completionSize(2)
>                         .log("Aggregated body: ${body}")
>                         .process(e -> {
>                             TimeUnit.SECONDS.sleep(4);
>                             resultQueue.add("aggregation processing
> complete: " + e.getIn().getBody());
>                         });
>                 }
>             }
>         );
>         context.start();
>
>         context.createProducerTemplate().sendBody("direct:start",
> "Hello!");
>         context.createProducerTemplate().sendBody("direct:start",
> "World!");
>
>         TimeUnit.SECONDS.sleep(10);
>
>         // actual: [
>         //     direct route complete: Hello!,
>         //     ack message; multicast complete: Hello!,
>         //     direct route complete: World!,
>         //     ack message; multicast complete: World!,
>         //     aggregation processing complete: Hello!; World!
>         // ]
>         assertThat(resultQueue).containsExactly(
>             "direct route complete: Hello!",
>             "direct route complete: World!",
>             "aggregation processing complete: Hello!; World!",
>             "ack message; multicast complete: Hello!",
>             "ack message; multicast complete: World!"
>         ).inOrder();
>     }
>
>     @Test
>     public void test_naive_ack_without_aggregation() throws Exception {
>         CamelContext context = new DefaultCamelContext();
>         context.addRoutes(
>             new RouteBuilder() {
>                 @Override
>                 public void configure() {
>                     from("direct:start")
>                         .log("Receiving message: ${body}")
>                         .multicast()
>                         .to("direct:a", "direct:b")
>                         .end()
>                         .log("multicast complete")
>                         .process(e -> resultQueue.add("ack message;
> multicast complete: " + e.getIn().getBody()));
>
>                     from("direct:a")
>                         .log("Receiving by a: ${body}")
>                         .process(e -> {
>                             TimeUnit.SECONDS.sleep(1);
>                             resultQueue.add("direct route a complete: " +
> e.getIn().getBody());
>                         });
>
>                     from("direct:b")
>                         .process(e -> {
>                             TimeUnit.SECONDS.sleep(4);
>                             resultQueue.add("direct route b complete: " +
> e.getIn().getBody());
>                         });
>                 }
>             }
>         );
>         context.start();
>
>         context.createProducerTemplate().sendBody("direct:start",
> "Hello!");
>         context.createProducerTemplate().sendBody("direct:start",
> "World!");
>
>         TimeUnit.SECONDS.sleep(10);
>
>         // this is correct
>         assertThat(resultQueue).containsExactly(
>             "direct route a complete: Hello!",
>             "direct route b complete: Hello!",
>             "ack message; multicast complete: Hello!",
>             "direct route a complete: World!",
>             "direct route b complete: World!",
>             "ack message; multicast complete: World!"
>         ).inOrder();
>     }
>
>     @Test
>     public void test_complex_ack_after_multicast_aggregation() throws
> Exception {
>         CamelContext context = new DefaultCamelContext();
>         context.addRoutes(
>             new RouteBuilder() {
>                 @Override
>                 public void configure() {
>                     from("direct:start")
>                         .log("Receiving message: ${body}")
>                         .multicast()
>                         .to("direct:a", "direct:b")
>                         .end()
>                         .log("multicast complete");
>
>                     from("direct:a")
>                         .log("Receiving by a: ${body}")
>                         .process(e -> {
>                             TimeUnit.SECONDS.sleep(1);
>                             resultQueue.add("direct route complete: " +
> e.getIn().getBody());
>                         })
>                         .to("direct:ack");
>
>                     from("direct:b")
>                         .aggregate(constant(true), new
> StringAggregationStrategy().delimiter("; "))
>                         .completionSize(2)
>                         .log("Aggregated body: ${body}")
>                         .process(e -> {
>                             TimeUnit.SECONDS.sleep(4);
>                             resultQueue.add("aggregation processing
> complete: " + e.getIn().getBody());
>                         })
>                         .split(body(), "; ")
>                         .to("direct:ack");
>
>                     from("direct:ack")
>                         .aggregate(body(), Objects::requireNonNullElse)
>                         .completionSize(2)
>                         .process(e -> {
>                             resultQueue.add("ack due to all completions: "
> + e.getIn().getBody());
>                         });
>                 }
>             }
>         );
>         context.start();
>
>         context.createProducerTemplate().sendBody("direct:start",
> "Hello!");
>         context.createProducerTemplate().sendBody("direct:start",
> "World!");
>
>         TimeUnit.SECONDS.sleep(10);
>
>         assertThat(resultQueue).containsExactly(
>             "direct route complete: Hello!",
>             "direct route complete: World!",
>             "aggregation processing complete: Hello!; World!",
>             "ack due to all completions: Hello!",
>             "ack due to all completions: World!"
>         ).inOrder();
>     }
> }
>


-- 
Claus Ibsen
-----------------
@davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to