I have routes that have following definition.

public class HttpRouteBuilder extends RouteBuilder {
   ...

    @Override
    public void configure() throws Exception {

            from(ICallbackServiceManager.ORIGIN_ENDPOINT)
                    .filter(PredicateBuilder.and(
                           
header(ICallbackEvent.EVENT_HEADER_IS_TEST).isEqualTo(allowTestEvent),
                           
header(ICallbackEvent.EVENT_HEADER_EVENT_TYPE).isEqualTo(eventType.name()),
                            PredicateBuilder.or(
                                   
header(ICallbackEvent.EVENT_HEADER_SVC_ID).in(httpEndpoint._getKeywords().toArray()),
                                   
header(ICallbackEvent.EVENT_HEADER_SVC_ID).regex(httpEndpoint._getKeywords().get(0))
                            )
                    ))
                    .routeId((allowTestEvent) ?
httpEndpoint._getTestRouteId() : httpEndpoint.getRouteId())
                    .*aggregate(constant(true), new
BasicBodyListAggregator())*
                   
.completionSize(Config.getInt("callbacks.route.aggregate.size", 2))
                   
.completionInterval(Config.getInt("callbacks.route.aggregate.interval",
1000))
                    .setHeader(Exchange.HTTP_METHOD, constant("POST"))
                    .setHeader(Exchange.CONTENT_TYPE,
constant(dataFormat.getClass().equals(XStreamDataFormat.class) ?
"application/xml" : "application/json"))
                    .setHeader(Exchange.HTTP_PROTOCOL_VERSION,
constant("HTTP/1.0"))
                    .marshal(dataFormat)
                    .to(httpEndpoint.endpointUri());
        
    }
}

With aggregation strategy as:
public class BasicBodyListAggregator implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        Object newBody = newExchange.getIn().getBody();
        ArrayList list = null;
        if (oldExchange == null) {

            list = new ArrayList();
            list.add(newBody);
            newExchange.getIn().setBody(list);
            return newExchange;
        } else {

            list = oldExchange.getIn().getBody(ArrayList.class);
            list.add(newBody);
            return oldExchange;
        }
    }
}

The issue is , when only one exchange is processed within the
completionInterval of 1s, the serialized output  does not include a list
(containing 1 single message), rather a single message itself.
{
    "list": {
        "ExternalEvent": *{
            "eventTime": 1408900445,
            "phoneNo": 9739772042,
            "content": "FK OFFERS something",
            "carrier": "vodafone",
            "location": "Bangalore"
        }*
    }
}

When it should be:
{
    "list": {
        "ExternalEvent": *[
            {
                "eventTime": 1408900445,
                "phoneNo": 9739772042,
                "content": "FK OFFERS something",
                "carrier": "vodafone",
                "location": "Bangalore"
            }
        ]*
    }
}

However, it just works fine, when more than 1 exchange is processed in
completionInterval.
{
    "list": {
        "ExternalEvent": [
            {
                "eventTime": 1408900445,
                "phoneNo": 9739772042,
                "content": "FK OFFERS something",
                "carrier": "vodafone",
                "location": "Bangalore"
            },
            {
                "eventTime": 1408900448,
                "phoneNo": 9232977242,
                "content": "FK OFFERS something",
                "carrier": "vodafone",
                "location": "Bangalore"
            }
        ]
    }
}


Any help / pointers would be appreciated.

Thanks,
Durga



--
View this message in context: 
http://camel.465427.n5.nabble.com/Issue-in-marshal-with-aggregation-tp5761125.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to