Hello Camel users,
I am using ActiveMQ and the embedded Camel "engine" that comes with it. I
have to say that I am new to both (Camel and even ActiveMQ).
I am trying to implement a simple message aggregator but I cannot get my
aggregation strategy to work correctly. I am looking for some help on this.
Basically, my correlationExpression and Completion criteria seem to work OK.
However, no matter what I do inside my "AggregationStrategy.aggregate"
method, the output of the aggregator seems unaffected.
My activemq.xml looks like this:
<camelContext id="camel" trace="true"
xmlns="http://camel.apache.org/schema/spring">
<route id="InvalAggregation">
<from uri="broker:queue:QIN"/>
<aggregate completionSize="3" strategyRef="aggregatorStrategy">
<correlationExpression>
<constant>true</constant>
</correlationExpression>
<to uri="broker:queue:QOUT"/>
</aggregate>
</route>
</camelContext>
<bean id="aggregatorStrategy" class="MyAggregationStrategy"/>
I have set the correlationExpression to <constant>true</constant> and the
completionSize to "3" such that every group of 3 messages generate a single
message on the output queue.
I do see that 1 message is generated on QOUT out of 3 incomming messages on
QIN. So both correlationExpression and completionSize behave OK.
Now, the problem is with my AggregationStrategy. It looks like this:
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
//simply combines Exchange String body values using '+' as a delimiter
class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(oldBody + "+" + newBody);
return oldExchange;
}
}
With the above code, when I send 6 messages on QIN with bodies: "A", "B",
"C", "D", "E", "F", QOUT gets 2 messages: "A" and "D".
What I expected on QOUT was 2 aggregated messages with bodies "A+B+C" and
"D+E+F".
The above observation is based on sending/reading messages via ActiveMQ Web
console or with actual ActiveMQ producer/consumer processes.
The strange thing is that if I look at the ActiveMQ log, I see that Camel
does aggregate the messages as expected. But somewhow ActiveMQ queue QOUT
gets something different:
(simplified log for readability):
....
INFO | Apache ActiveMQ 5.10.0 is starting
....
INFO | Apache Camel 2.13.1 (CamelContext: camel) started in 0.675 seconds
....
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:A
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:B
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:C
INFO | (InvalAggregation) aggregate[true] --> broker://queue:QOUT <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:A+B+C
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:D
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:E
INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:F
INFO | (InvalAggregation) aggregate[true] --> broker://queue:QOUT <<<
Pattern:InOnly, Headers:{...}, BodyType:String, Body:D+E+F
There must be something fundamental I missed! Anyone has any idea ?
Many Thanks in advance for your help !
Hedi
--
View this message in context:
http://camel.465427.n5.nabble.com/Aggregator-output-not-aggregated-tp5763177.html
Sent from the Camel - Users mailing list archive at Nabble.com.