I'm in a scenario where I have one input subscription to a message queue.
It's prohibitively expensive to subscribe twice, but I would like to take
two separate actions for the messages I receive. I'm using multicast to do
so, but I would also like to aggregate on one of those routes. Multicasting
to two direct routes and then aggregating on one works just fine until it
comes time to acknowledge the message from the input subscription.

I should only acknowledge the message once both downstream actions are
completed. Both are idempotent so redelivery isn't a problem, but dropping
data due to an early acknowledgment followed by process failure is a
problem.

My problem is I can't find a simple way to synchronize on the completed
state of the first direct route and the completed processing after
aggregating the second direct route. In other applications we use end()
after multicast() to synchronize on the completions of downstream routes,
and that does work.

I'm attaching three minimal examples demonstrating the behavior I've
described:

The first example is set up as my application is set up. Ideally I can get
this example to pass its assertion without too much extra configuration.

The second example is set up as our other applications are set up, and the
acknowledgment after multicast correctly waits until all downstream
processing is complete.

The third example is the only way I've found to achieve proper
synchronization, but it involves tracking completion state and stitching
messages back together and aggregating until I've received each one back as
many times as I've multicasted it. It does work, but it is ugly.

My question is: Is there a way to acknowledge from the end of the multicast
declaration even when using aggregation? Or is the third example the best
way to ensure I don't acknowledge messages too early?

This is tested with Camel 4.10.0.

Thanks,
Andrew

Examples:

package com.example;

import static com.google.common.truth.Truth.assertThat;

import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.processor.aggregate.StringAggregationStrategy;
import org.junit.jupiter.api.Test;

public class CamelAggregationExamples {
    private final LinkedBlockingQueue<String> resultQueue = new
LinkedBlockingQueue<>();

    @Test
    public void test_naive_ack_after_multicast_with_aggregation() throws
Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(
            new RouteBuilder() {
                @Override
                public void configure() {
                    from("direct:start")
                        .log("Receiving message: ${body}")
                        .multicast()
                        .to("direct:a", "direct:b")
                        .end()
                        .log("multicast complete")
                        .process(e -> resultQueue.add("ack message;
multicast complete: " + e.getIn().getBody()));

                    from("direct:a")
                        .log("Receiving by a: ${body}")
                        .process(e -> {
                            TimeUnit.SECONDS.sleep(1);
                            resultQueue.add("direct route complete: " +
e.getIn().getBody());
                        });

                    from("direct:b")
                        .aggregate(constant(true), new
StringAggregationStrategy().delimiter("; "))
                        .completionSize(2)
                        .log("Aggregated body: ${body}")
                        .process(e -> {
                            TimeUnit.SECONDS.sleep(4);
                            resultQueue.add("aggregation processing
complete: " + e.getIn().getBody());
                        });
                }
            }
        );
        context.start();

        context.createProducerTemplate().sendBody("direct:start", "Hello!");
        context.createProducerTemplate().sendBody("direct:start", "World!");

        TimeUnit.SECONDS.sleep(10);

        // actual: [
        //     direct route complete: Hello!,
        //     ack message; multicast complete: Hello!,
        //     direct route complete: World!,
        //     ack message; multicast complete: World!,
        //     aggregation processing complete: Hello!; World!
        // ]
        assertThat(resultQueue).containsExactly(
            "direct route complete: Hello!",
            "direct route complete: World!",
            "aggregation processing complete: Hello!; World!",
            "ack message; multicast complete: Hello!",
            "ack message; multicast complete: World!"
        ).inOrder();
    }

    @Test
    public void test_naive_ack_without_aggregation() throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(
            new RouteBuilder() {
                @Override
                public void configure() {
                    from("direct:start")
                        .log("Receiving message: ${body}")
                        .multicast()
                        .to("direct:a", "direct:b")
                        .end()
                        .log("multicast complete")
                        .process(e -> resultQueue.add("ack message;
multicast complete: " + e.getIn().getBody()));

                    from("direct:a")
                        .log("Receiving by a: ${body}")
                        .process(e -> {
                            TimeUnit.SECONDS.sleep(1);
                            resultQueue.add("direct route a complete: " +
e.getIn().getBody());
                        });

                    from("direct:b")
                        .process(e -> {
                            TimeUnit.SECONDS.sleep(4);
                            resultQueue.add("direct route b complete: " +
e.getIn().getBody());
                        });
                }
            }
        );
        context.start();

        context.createProducerTemplate().sendBody("direct:start", "Hello!");
        context.createProducerTemplate().sendBody("direct:start", "World!");

        TimeUnit.SECONDS.sleep(10);

        // this is correct
        assertThat(resultQueue).containsExactly(
            "direct route a complete: Hello!",
            "direct route b complete: Hello!",
            "ack message; multicast complete: Hello!",
            "direct route a complete: World!",
            "direct route b complete: World!",
            "ack message; multicast complete: World!"
        ).inOrder();
    }

    @Test
    public void test_complex_ack_after_multicast_aggregation() throws
Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(
            new RouteBuilder() {
                @Override
                public void configure() {
                    from("direct:start")
                        .log("Receiving message: ${body}")
                        .multicast()
                        .to("direct:a", "direct:b")
                        .end()
                        .log("multicast complete");

                    from("direct:a")
                        .log("Receiving by a: ${body}")
                        .process(e -> {
                            TimeUnit.SECONDS.sleep(1);
                            resultQueue.add("direct route complete: " +
e.getIn().getBody());
                        })
                        .to("direct:ack");

                    from("direct:b")
                        .aggregate(constant(true), new
StringAggregationStrategy().delimiter("; "))
                        .completionSize(2)
                        .log("Aggregated body: ${body}")
                        .process(e -> {
                            TimeUnit.SECONDS.sleep(4);
                            resultQueue.add("aggregation processing
complete: " + e.getIn().getBody());
                        })
                        .split(body(), "; ")
                        .to("direct:ack");

                    from("direct:ack")
                        .aggregate(body(), Objects::requireNonNullElse)
                        .completionSize(2)
                        .process(e -> {
                            resultQueue.add("ack due to all completions: "
+ e.getIn().getBody());
                        });
                }
            }
        );
        context.start();

        context.createProducerTemplate().sendBody("direct:start", "Hello!");
        context.createProducerTemplate().sendBody("direct:start", "World!");

        TimeUnit.SECONDS.sleep(10);

        assertThat(resultQueue).containsExactly(
            "direct route complete: Hello!",
            "direct route complete: World!",
            "aggregation processing complete: Hello!; World!",
            "ack due to all completions: Hello!",
            "ack due to all completions: World!"
        ).inOrder();
    }
}

Reply via email to