I am currently using Camel 1.6.0

I have a process in my application where a small number of somewhat time-sensitive messages are sent out to various channels (how isn't really relevant - could be email, jabber, smoke signals, whatever) Rather than process them in a queue, I would like to send them out concurrently. The piece that creates the messages has a loop, and in the java code it calls I am injecting an endpoint, and then in a loop I am sending messages to that endpoint:
        
        @EndpointInject(name="fooEndpoint")
        protected ProducerTemplate fooTemplate

        ....

        for(some criteria) {
                ...
                 fooTemplate.sendBodyAndHeaders(fooBody, fooHeaders);
        }

This part works fine. The fooEndpoint is created in a Spring DSL:

        <camel:endpoint id="fooEndpoint" uri="seda:fooOutbound" />

A java dsl routebuilder builds the route to process these messages like this:

   from(endpoint("foo"))
             .choice()
.when ( header("fooProperty").isEqualTo(SomeEnum.A_VALUE) )
                    .choice()
.when ( header("barProperty").isEqualTo(AnotherEnum.B_VALUE))
                            .to("someEndpoint")
.when ( header("barProperty").isEqualTo(AnotherEnum.C_VALUE))
                            .beanRef("someBean")
.when ( header("phoneMediaType").isEqualTo(PhoneMediaType.TEXT))
                            .beanRef("someOtherBean")
                        .otherwise()
                            .to("log:foo:level=ERROR")

.when (header("blargProperty").isEqualTo(SomeEnum.D_VALUE))
                    .to("log:outbound-messages?level=INFO")
                .otherwise()
                    .to("log:outbound-messages?level=ERROR");


I naively assumed that because I was throwing out new messages in the loop (sending out a bunch of new messages in the fooTempate.sendBodyAndHeaders) it would create a new thread to handle them because I was using the seda endpoint. I realized when I saw that they were going in a queue that was not the case.

I tried to remedy this by setting the "concurrentConsumers" property on the consuming endpoint like this:

from("seda:fooOutbound?concurrentConsumers=8")
        .choice()
        ...

but that gave me this exception:

Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: seda:incidentOutboundContactPending? concurrentConsumers=8 due to: There are 1 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{concurrentConsumers=8}] at org .apache .camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:95) at org .apache .camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java: 331)
        ... 70 more

When I it up using a thread pool, it didn't seem to be handling it concurrently either - but I didn't look at it all that closely. It sounded to me like a concurrent consumer setup was more in line with what I wanted.

   from(endpoint("foo")).thread(8)
             .choice()

Is there something I'm missing? Any help is appreciated.

Ryan

Reply via email to