Maybe you could put together a stand-alone unit test that shows this? On Tue, Dec 3, 2013 at 1:04 PM, marcelcasado <[email protected]> wrote: > I have an issue when using camel split and camel aggregator together the > messages continue down the route before all the messages from split has been > completely processed. In my route splited-agregated messages get into the > "finally" block when I was expecting that only one will get to the "finally" > only once after all the split messages has been completed. I tried different > things but not been able to get to hold the splited messages to hit the > "finally" block. > > What I'm trying to do is to split xml elements and the batch them using the > "aggregate" so I do batch processing. Ideally when done with all the > processing I want to hit the "finally" block. > > Here are the camel routes: > > fromF(readUri, inboundDataDir, pollingInterval, filterBeanName) > .routeId(routeId) > .doTry() > .to("bean:" + interfaceActivityReportEnricher) > .to("bean:" + tenantIdEnricher) > .split(stax(splitClass, false)) > .streaming() > .to("bean:dataBatchEnricher") > .to("direct:" + batchingStrategy) > .endDoTry() > .doCatch(Exception.class) > > .to("bean:errorProcessor?method=handleError(${file:absolute.path}, > ${exception}, ${property." > + INTERFACE_ACTIVITY_REPORT_PROPERTY_NAME + "})") > .doFinally() > > .to("bean:completedFileNameEnricher?method=enrichWithCompletedFileName(*," + > inboundDataDir + "," + outboundDataDir + ")") > .setBody() > .simple("${property." + > INTERFACE_ACTIVITY_REPORT_PROPERTY_NAME + ".report}") > .marshal(jaxbDataFormat) > .toF(writeUri, outboundDataDir, interfaceActivityReportDir) > .end(); > > if (batchingStrategy == "sizeStrategy-TenantIdCorrelation") { > from("direct:sizeStrategy-TenantIdCorrelation") > // aggregate all exchanges correlated by the > TENANT_ID_PROPERTY_NAME property. > // Aggregate them using the ArrayListAggregationStrategy > strategy which > // and after N messages has been aggregated then > complete the aggregation > // and send it to processor > .aggregate(property(TENANT_ID_PROPERTY_NAME), new > ArrayListAggregationStrategy()) > .aggregationRepository(repo) > .completionSize(8) > .completionTimeout(10000) > .forceCompletionOnStop() > .parallelProcessing() > .to("bean:" + importProcessor) > .end(); > > } > > Thanks, > > -Marcel > > > > > -- > View this message in context: > http://camel.465427.n5.nabble.com/Issues-using-camel-split-and-aggregator-together-tp5744266.html > Sent from the Camel - Users mailing list archive at Nabble.com.
-- Christian Posta http://www.christianposta.com/blog twitter: @christianposta
