I'm having trouble getting a slightly complex split/unmarshal/bean augment/aggregate.... pipeline to complete as I want it to. It completes with a completionTimeout set, but I want it to complete based on all messages from the batch getting through. Without the completionTimeout it hangs forever.
When I put a breakpoint in the stopTimer bean function, all of the rows make it through, but it is the timer that caused it to complete. Here is the route and the aggregators are at the end of the post. The bean funcs do some custome business logic and either augment or reorder the csvfields and are passing through all messages. Is there somthing wrong with my Aggregators or my routes or both that are preventing it form completing? from("direct:dqgenericprocessorreorderaddtableId") .routeId("step2reorderaddtableid") .bean(dqCamelBeanFuncs, "startTimer") // initialize the bean only once .bean(dqCamelBeanFuncs, "init") // Split apart for un-marshaling then reordering and adding the tableid .split(body().tokenize("\n")).streaming().parallelProcessing().threads(40, 80).unmarshal(customCsvMarshal) .bean(dqCamelBeanFuncs, "reorderAndConvertFieldsUpdateExchange") .to("direct:dqgenericprocessorreassembleaddtmcs"); from("direct:dqgenericprocessorreassembleaddtmcs") .routeId("step3reassembleaddtmcs") // Reasemble to nice batch sizes for adding TMC's .aggregate(new DQCustomeCSVAggregationExchange()).simple("$body[" + DQCamelBeanFuncs.FieldOrder.tableid.toString() +"]") .completionSize(500).completionTimeout(1000) .parallelProcessing().threads(40, 80).bean(dqCamelBeanFuncs, "addTMCs") .to("direct:dqgenericprocessorreassemblintoone"); from("direct:dqgenericprocessorreassemblintoone") .routeId("step4reassembleintoone") // Reassemble into 1 piece .aggregate(new DQCustomeMultirowCSVAggregationExchange()).simple("$body[" + DQCamelBeanFuncs.FieldOrder.tableid.toString() + "]") .completionTimeout(10000) .parallelProcessing().threads(40, 80) .bean(dqCamelBeanFuncs, "stopTimer"); public class DQCustomeMultirowCSVAggregationExchange implements AggregationStrategy { protected Logger log = LoggerFactory.getLogger(getClass()); public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { List<List<String>> csvList = null; if (oldExchange == null) { // Create the new multirow body and set the body csvList = new ArrayList<List<String>>(); csvList.addAll((List<List<String>>)(newExchange.getIn().getBody())); newExchange.getIn().setBody(csvList); return newExchange; } // get the csvList csvList = (ArrayList<List<String>>)oldExchange.getIn().getBody(); // Add the row to the list csvList.addAll((ArrayList<List<String>>)newExchange.getIn().getBody()); newExchange.getIn().setBody(csvList); return newExchange; } } public class DQCustomeCSVAggregationExchange implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { List<List<String>> csvList = null; if(oldExchange == null) { // switch to the body type we want csvList = new ArrayList<List<String>>(); // Add the row csvList.add((List<String>)newExchange.getIn().getBody()); // Set the exchanges newExchange.getIn().setBody(csvList); return newExchange; } // get the csvList csvList = (ArrayList<List<String>>)oldExchange.getIn().getBody(); // Add the row to the list csvList.add((List<String>)newExchange.getIn().getBody()); newExchange.getIn().setBody(csvList); return newExchange; } } -- View this message in context: http://camel.465427.n5.nabble.com/CustomAggregation-of-CSV-not-completing-don-t-want-to-complete-using-completionTimeout-tp4469928p4469928.html Sent from the Camel - Users mailing list archive at Nabble.com.