Hello,I need to implement a split/aggregate route same as in official
documentation example but with little bit more complex logic:

Form docs: http://camel.apache.org/composed-message-processor.html
from("direct:start")    .split(body().tokenize("@"), new MyOrderStrategy())     
  
.to("bean:MyOrderService?method=handleOrder")                // ONE MORE
AGGREGATOR WITH SPECIFIC CORRELATION ID.    .end()   
.to("bean:MyOrderService?method=buildCombinedResponse")
While processing the splitted messages I need to aggregate part of them to
one message and receive this message on aggregation strategy
(MyOrderStrategy here) along with other not aggregated messages.

*Is it possible to implement using Camel?*

If I put aggregator after bean:MyOrderService it successfully aggregate
selected by specific correlation id  messages to one message as required but
on  MyOrderStrategy aggregator I still receive output of
bean:MyOrderService:
    @Override    public void configure()    {        from("direct:list") //
Receives ->
{"search":"USER","date-type":{"end_date","from":49449600,"to":1456531200}}      
         
.process(filtersRequestProcessor)// Converts to  ->
{"search":"USER"}\n{"date-type":{"end_date",
"from":49449600,"to":1456531200}}               
.split(body().tokenize("\n"), new MyAgg())                   
.bean(FilterTypeMapper.class)                    .choice()                      
 
.when(header("filter.type").isEqualTo("single")) // {"search":"USER"}           
                
.log("Go to single")                            .process(new
SingleFilterProcessor())                       
.when(header("filter.type").isEqualTo("daterange")) //
{"date-type":{"end_date", "from":49449600,"to":1456531200}}                     
      
.log("Go to daterange")                            .process(new
DateRangeFilterProcessor()) // Goes to aggregation MyAgg() !!!!                 
  
.end()                    .log("Sending to collections: ${body}")               
    
.to("direct:collections")                .end()               
.to("direct:aggregated");        from("direct:collections")               
.log("Goes to DynamicRouter::routeCollection with header:
${headers.filter.collection}")                .choice()                   
.when(header("filter.collection").isEqualTo("range"))                       
.to("direct:range")                    .otherwise(); // Goes to aggregation
MyAgg()        from("direct:range")               
.aggregate(header("filter.collection"), filterCollectionAggregationStrategy)    
           
.completionSize(exchangeProperty("rangeCount")) // Aggregation completed
successfully: Setting bean invocation result on the OUT message:
org.springframework.data.mongodb.core.query.Criteria@e143c6a3               
.bean(MyService, "processMessage"); // Does NOT go to aggregation MyAgg()
!!!!        /**         *  Process aggregated query         */       
from("direct:aggregated").transform(body())                .bean(MyService,
"retrieveAll")                .to("direct:finish");    }




--
View this message in context: 
http://camel.465427.n5.nabble.com/Nested-aggregation-with-split-tp5780607.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to