Yeah I did that, Changed it to a higher value still facing the same issue. I can see the aggregator getting executed. When I print the exchange that I am returning from the aggregator I can see that it contains the aggregated message. But on the client end the response is not correct.
What makes it tricky is that for 8 out of 10 requests the client application makes, it gets valid aggregated responses. It's only for 1-2 requests that it is failing. I tried sending request in parallel and sequential manner still the results are same (so we can rule multithreading issues for objects being overwritten etc etc) I think the problem is somewhere between the point "aggregator finished its job" -->to--> "camel returns this exchange as a WS response". Unfortunately I don’t have much understanding of how that is happening. Also is there some blog or documentation about how apache camel will pick up the aggregators response (i.e. exchange object) and convert it to a SOAP response? Thanks Vivek -----Original Message----- From: Willem Jiang [mailto:[email protected]] Sent: Tuesday, August 17, 2010 3:01 AM To: [email protected] Subject: Re: Aggregator response not being sent consistently Hi, You set the aggregation completionTimeout(5000L), maybe you remove it or set a longer time. Wille vivek k wrote: > I have mapped a web service as the start endpoint and am multicasting the > request to multiple sources, aggregating the response and sending the > consolidated response back via the WS. > > The WS is a sync web service. > > The problem is the WS is not always sending the response that is returned by > the aggregator. It sometimes returns the response as the final exchange that > was sent to the aggregator. > > The route is defined as: > > from("cxf:bean:icDataService?dataFormat=POJO") > .process((SearchResourceRequestProcessor) > applicationContext.getBean("searchResourceRequestProcessor")) > .multicast() > .parallelProcessing() > > .executorServiceRef("customRequestThreadPoolExecutor") > > .to("direct:iclSystemEndpoint","direct:lisSystemEndpoint"); > > from("direct:iclSystemEndpoint") > .to(propertyMapper.getICL_WS_ENDPOINT()) > .to("direct:aggregation-strategy-icl-lis"); > > from("direct:lisSystemEndpoint") > .to(propertyMapper.getLIS_WS_ENDPOINT()) > .to("direct:aggregation-strategy-icl-lis"); > > from("direct:aggregation-strategy-icl-lis") > .aggregate(header("CORELATION_ID"), > (SearchResourceAggregationStrategy) > applicationContext.getBean("aggregatorStrategy")) > .completionSize(2) > > .executorServiceRef("customRequestThreadPoolExecutor") > .completionTimeout(5000L) > .process(new Processor() { > public void process(Exchange > exchange) throws Exception { > LOG.info("FINAL RETURN ~~ NO > ROUTES LEFT -> " + exchange); > } > }); > > > When I run this with 10-12 WS Requests at one time, I get 8-9 aggregated > responses and 1-2 responses where the result before aggregation was returned > (that is the list of multiple responses appended to exchange) > > For every request I can see the last process logging “Final Retun…” > successfully with correct response, somehow the WS Response is the one that > is not getting populated correctly. > > The Aggregator code: > > public class SearchResourceAggregationStrategy implements > AggregationStrategy { > > private static final Log LOG = > LogFactory.getLog(SearchResourceAggregationStrategy.class); > private final int totalResponseExpected = 2; > protected JoinICLAndLISResponse joinICLAndLISResponse; > > public Exchange aggregate(Exchange oldExchange, Exchange newExchange){ > > List<SearchResourceResponse> responses; > > if (oldExchange == null) > responses = new ArrayList<SearchResourceResponse>(); > else > responses = (List<SearchResourceResponse>) > oldExchange.getIn().getBody(); > > LOG.debug("New exchange received " + newExchange); > > responses.add((SearchResourceResponse) > newExchange.getIn().getBody(Message.class).getBody()); > newExchange.getIn().setBody(responses); > > if(responses.size()==totalResponseExpected){ > newExchange = joinICLAndLISResponse.process(newExchange); // will > joing the responses in one response - This is working fine. > } > > return newExchange; > } > > public void setJoinICLAndLISResponse(JoinICLAndLISResponse > joinICLAndLISResponse) { > this.joinICLAndLISResponse = joinICLAndLISResponse; > } > > } > > Logged response from the JUnit test: > > [16/08/10 10:42:55:055 MDT] ICL-RnR INFO > init.TestSearchForDeviceParallelMultipleRequests: RESPONSE ~~~~~> Message: > <LogicalDevice> > > <commonName>552987dc-19d1-4a49-94b3-1da3ed207b991281976975442552987dc-19d1-4a49-94b3-1da3ed207b991281976975442</commonName> > </LogicalDevice> THIS WORKD FINE - RETURNED AGGREGATED RESPONSE > > [16/08/10 10:42:55:055 MDT] ICL-RnR INFO > init.TestSearchForDeviceParallelMultipleRequests: RESPONSE ~~~~~> Message: > <LogicalDevice> > > <commonName>b8b17a83-eeef-42b2-a01a-c8523f69db551281976975458b8b17a83-eeef-42b2-a01a-c8523f69db551281976975474</commonName> > </LogicalDevice> THIS WORKD FINE - RETURNED AGGREGATED RESPONSE > > [16/08/10 10:42:55:055 MDT] ICL-RnR INFO > init.TestSearchForDeviceParallelMultipleRequests: RESPONSE ~~~~~> Message: > [<LogicalDevice> > <commonName>7f70aa4d-8c08-4385-ad22-5f3b6e577b051281976975505</commonName> > </LogicalDevice>, <LogicalDevice> > <commonName>7f70aa4d-8c08-4385-ad22-5f3b6e577b051281976975505</commonName> > </LogicalDevice>] THIS DIDNT WORK - THE RESPONSE HERE IS THE INPUT TO > AGGREGATOR THEN THE AGGREGATOR AGGREGATED IT TO ONE RESPONSE (I CAN SEE IT > IN THE LOGS) STILL THIS WAS RETURNED !!! > > Any idea why am i seeing this unpredicted behavior ? > Thanks. > Vivek > This communication is the property of Qwest and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
