[ https://issues.apache.org/jira/browse/CAMEL-13287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Claus Ibsen resolved CAMEL-13287. --------------------------------- Resolution: Fixed > AggregationStrategy - Access original exchange in aggregate method > ------------------------------------------------------------------- > > Key: CAMEL-13287 > URL: https://issues.apache.org/jira/browse/CAMEL-13287 > Project: Camel > Issue Type: New Feature > Components: camel-core > Reporter: Balazs Szeti > Assignee: Claus Ibsen > Priority: Major > Fix For: 3.0.0, 3.0.0.M5 > > > For aggregation after multicast/splitter the original exchange should > optionally be available in the aggregate method. > There are several use cases when we would like to go-on processing the > original exchange after multicast, but we'd like to enrich it with the > outcome of the called routes. For example: > {code:java} > rest() > .get("orders/{orderId}") > .route() > .to("direct:getOrderDetails") //get UserId, ItemId > .setBody(method(this,"createResponsePojo")) > .multicast(new MyAggregationStrategy()) > .to("direct:getUserDetails") > .to("direct:getDeliveryAddress") > .to("direct:getItemDetails") > .end() > ; > {code} > If any of the called routes fail, we still would like return a partial > response in our service (this is a common requirement in case of > microservices). The MyAggregationStrategy should simply enrich the > ResponePojo object from the original exchange somehow with the new exchanges > coming from the sub-routes. See this example: > {code:java} > public class MyAggregationStrategy implements AggregationStrategy { > public Exchange aggregateWithOriginal(Exchange oldExchange, Exchange > newExchange, Exchange originalExchange) { > Exchange exchange = oldExchange != null ? oldExchange : originalExchange; > ResponsePojo response = exchange.getMessage().getBody(ResponsePojo.class); > if (! newExchange.isFailed()) { > // ... Add newExchange body somehow to ResponsePojo object... > } > return exchange; > } > ... > } > {code} > Currently only the exchanges from the "sub-routes" are available during > aggregation, so the exchange after the aggregate will be one (the first) of > those. This comes with multiple problems: > * Though the exchanges in the sub-routes are copies of the original > exchange, sub routes make modifications: modify headers, modify properties, > etc. Usually we don't want to see all these set by a sub-route on the final > aggregated exchange. It's only noise. "Whatever happens in the sub-route, > should stay in the sub-route." - We only want to see on the aggregated > exchange what we "took" intentionally from the sub-route exchanges. > * If we use stopOnException(true) our life is simple because we usually > don't have to worry about exceptions in aggregate, we will stop anyway. The > aggregation logic can become complicated if we want to go on with processing > in case of errors. The first time aggregate() is called the oldExchange is > null, so we usually take the newExchange as the return value. If this > exchange has an Exception, we need to "clean" it first, otherwise the error > handler will kick in after aggregation. This is non-trivial. > h3. Suggested approach > Let's extend the AggregationStrategy interface with a new method that takes > three exchanges. This should be called after Multicast EIP (Enrich EIP is > simple, it only has two exchanges). > With a default implementations we can keep the interface compatible: > {code:java} > public interface AggregationStrategy { > /** > * Aggregates an old, a new and the original exchange together to create a > single combined exchange. > * > * @param oldExchange the oldest exchange, which is the returned value of the > previous aggregation on null. > * @param newExchange the newest exchange > * @param originalExchange the original exchange before Multicast or Splitter > EIP. Null in case of Enrich EIP. > * @return a combined exchange, favor returning the oldExchange > */ > default Exchange aggregateWithOriginal(Exchange oldExchange, Exchange > newExchange, Exchange originalExchange) { > return aggregate(oldExchange, newExchange); > }; > //Maybe we should have a default implementation here too so one can only > implement aggregateWithOriginal() > Exchange aggregate(Exchange oldExchange, Exchange newExchange); > ... > } > {code} > {code:java} > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)