I am currently using Camel 1.6.0
I have a process in my application where a small number of somewhat
time-sensitive messages are sent out to various channels (how isn't
really relevant - could be email, jabber, smoke signals, whatever)
Rather than process them in a queue, I would like to send them out
concurrently. The piece that creates the messages has a loop, and in
the java code it calls I am injecting an endpoint, and then in a loop
I am sending messages to that endpoint:
@EndpointInject(name="fooEndpoint")
protected ProducerTemplate fooTemplate
....
for(some criteria) {
...
fooTemplate.sendBodyAndHeaders(fooBody, fooHeaders);
}
This part works fine. The fooEndpoint is created in a Spring DSL:
<camel:endpoint id="fooEndpoint" uri="seda:fooOutbound" />
A java dsl routebuilder builds the route to process these messages
like this:
from(endpoint("foo"))
.choice()
.when
( header("fooProperty").isEqualTo(SomeEnum.A_VALUE) )
.choice()
.when
( header("barProperty").isEqualTo(AnotherEnum.B_VALUE))
.to("someEndpoint")
.when
( header("barProperty").isEqualTo(AnotherEnum.C_VALUE))
.beanRef("someBean")
.when
( header("phoneMediaType").isEqualTo(PhoneMediaType.TEXT))
.beanRef("someOtherBean")
.otherwise()
.to("log:foo:level=ERROR")
.when
(header("blargProperty").isEqualTo(SomeEnum.D_VALUE))
.to("log:outbound-messages?level=INFO")
.otherwise()
.to("log:outbound-messages?level=ERROR");
I naively assumed that because I was throwing out new messages in the
loop (sending out a bunch of new messages in the
fooTempate.sendBodyAndHeaders) it would create a new thread to handle
them because I was using the seda endpoint. I realized when I saw that
they were going in a queue that was not the case.
I tried to remedy this by setting the "concurrentConsumers" property
on the consuming endpoint like this:
from("seda:fooOutbound?concurrentConsumers=8")
.choice()
...
but that gave me this exception:
Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to
resolve endpoint: seda:incidentOutboundContactPending?
concurrentConsumers=8 due to: There are 1 parameters that couldn't be
set on the endpoint. Check the uri if the parameters are spelt
correctly and that they are properties of the endpoint. Unknown
parameters=[{concurrentConsumers=8}]
at
org
.apache
.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:95)
at
org
.apache
.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java:
331)
... 70 more
When I it up using a thread pool, it didn't seem to be handling it
concurrently either - but I didn't look at it all that closely. It
sounded to me like a concurrent consumer setup was more in line with
what I wanted.
from(endpoint("foo")).thread(8)
.choice()
Is there something I'm missing? Any help is appreciated.
Ryan