Hi

Try adding a <convertBodyTo type="String"/> before the <aggregate>

Also what version of ActiveMQ and Camel do you use?

On Wed, Feb 25, 2015 at 5:41 PM, Hedi <[email protected]> wrote:
> Hello Camel users,
>
> I am using ActiveMQ and the embedded Camel "engine" that comes with it. I
> have to say that I am new to both (Camel and even ActiveMQ).
>
> I am trying to implement a simple message aggregator but I cannot get my
> aggregation strategy to work correctly. I am looking for some help on this.
>
> Basically, my correlationExpression and Completion criteria seem to work OK.
> However, no matter what I do inside my "AggregationStrategy.aggregate"
> method, the output of the aggregator seems unaffected.
>
> My activemq.xml looks like this:
>
>     <camelContext id="camel" trace="true"
> xmlns="http://camel.apache.org/schema/spring";>
>         <route id="InvalAggregation">
>             <from uri="broker:queue:QIN"/>
>             <aggregate completionSize="3" strategyRef="aggregatorStrategy">
>                 <correlationExpression>
>                     <constant>true</constant>
>                 </correlationExpression>
>                 <to uri="broker:queue:QOUT"/>
>             </aggregate>
>         </route>
>     </camelContext>
>
>     <bean id="aggregatorStrategy" class="MyAggregationStrategy"/>
>
> I have set the correlationExpression to <constant>true</constant> and the
> completionSize to "3" such that every group of 3 messages generate a single
> message on the output queue.
> I do see that 1 message is generated on QOUT out of 3 incomming messages on
> QIN. So both correlationExpression and completionSize behave OK.
>
>
> Now, the problem is with my AggregationStrategy. It looks like this:
>
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.camel.Exchange;
> import org.apache.camel.Message;
>
> //simply combines Exchange String body values using '+' as a delimiter
> class MyAggregationStrategy implements AggregationStrategy {
>
>     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>
>         if (oldExchange == null) {
>
>             return newExchange;
>         }
>
>         String oldBody = oldExchange.getIn().getBody(String.class);
>         String newBody = newExchange.getIn().getBody(String.class);
>         oldExchange.getIn().setBody(oldBody + "+" + newBody);
>
>         return oldExchange;
>
>     }
> }
>
> With the above code, when I send 6 messages on QIN with bodies: "A", "B",
> "C", "D", "E", "F", QOUT gets 2 messages: "A" and "D".
> What I expected on QOUT was 2 aggregated messages with bodies "A+B+C" and
> "D+E+F".
> The above observation is based on sending/reading messages via ActiveMQ Web
> console or with actual ActiveMQ producer/consumer processes.
>
> The strange thing is that if I look at the ActiveMQ log, I see that Camel
> does aggregate the messages as expected. But somewhow ActiveMQ queue QOUT
> gets something different:
>
> (simplified log for readability):
> ....
> INFO | Apache ActiveMQ 5.10.0 is starting
> ....
> INFO | Apache Camel 2.13.1 (CamelContext: camel) started in 0.675 seconds
> ....
> INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
> Pattern:InOnly, Headers:{...}, BodyType:String, Body:A
> INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
> Pattern:InOnly, Headers:{...}, BodyType:String, Body:B
> INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
> Pattern:InOnly, Headers:{...}, BodyType:String, Body:C
> INFO | (InvalAggregation)      aggregate[true] --> broker://queue:QOUT <<<
> Pattern:InOnly, Headers:{...}, BodyType:String, Body:A+B+C
> INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
> Pattern:InOnly, Headers:{...}, BodyType:String, Body:D
> INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
> Pattern:InOnly, Headers:{...}, BodyType:String, Body:E
> INFO | (InvalAggregation) from(broker://queue:QIN) --> aggregate[true] <<<
> Pattern:InOnly, Headers:{...}, BodyType:String, Body:F
> INFO | (InvalAggregation)     aggregate[true] --> broker://queue:QOUT <<<
> Pattern:InOnly, Headers:{...}, BodyType:String, Body:D+E+F
>
> There must be something fundamental I missed! Anyone has any idea ?
>
> Many Thanks in advance for your help !
> Hedi
>
>
>
>
> --
> View this message in context: 
> http://camel.465427.n5.nabble.com/Aggregator-output-not-aggregated-tp5763177.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: [email protected]
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
hawtio: http://hawt.io/
fabric8: http://fabric8.io/

Reply via email to