The exchange property of the Exchange.BATCH_SIZE will be used when the aggregation call the isCompeleted(), So please don't change it dynamically.

On 4/27/12 11:25 PM, ebinsingh wrote:
Hi All,

I am trying to aggregate large number of xml files into files of 50000
records.
I am getting java.lang.OutOfMemoryError - Java heap space error.

I am trying to see if there are any leaks but to my eyes i do not see any.

Appreciate your thoughts on this.

Aggreation logic:

public class GlobalAggrStratergy implements AggregationStrategy {
        private static Logger log = Logger.getLogger(GlobalAggrStratergy.class);
        int counter = 0;
        @Override
        public Exchange aggregate(Exchange exchange1, Exchange exchange2) {
                try{
                        StringBuilder builder;
                if (exchange1 == null || null == exchange1.getIn().getBody()) {
                        builder = new StringBuilder();
                        exchange1 = new DefaultExchange(new 
DefaultCamelContext());
                    exchange1.getIn().setBody(builder);
                }
                builder = exchange1.getIn().getBody(StringBuilder.class);
                builder.append(exchange2.getIn().getBody()+"\n");
                exchange1.getIn().setBody(builder);
                exchange1.getIn().setHeader(Exchange.FILE_NAME_ONLY,
exchange2.getProperty(Exchange.FILE_NAME_ONLY));
                counter++;
                }catch(Exception ex){
                        log.error("Error aggregating", ex);
                }
                exchange1.setProperty(Exchange.BATCH_SIZE, counter);
                if(counter>= 50000)
                        counter = 0;
         return exchange1;
        }


Route configuration:

        public void configure() throws Exception
        {
                from("direct:producerQueue").log("File name: 
${in.header.fileName}")
                .setProperty(Exchange.FILE_NAME_ONLY, 
simple("${file:onlyname.noext}"))
                .split().tokenizeXML("IPDR").streaming()
                .aggregate(header("messageId"), new
GlobalAggrStratergy()).completionSize(50000).completionTimeout(20000)
                .process(new IPDRHeaderFooterProcessor())
                .to(IPDRUtil.getInstance().getProperty("IPDROutputDir"));
        }

Thanks&  regards,
Ebe

--
View this message in context: 
http://camel.465427.n5.nabble.com/Java-heap-space-issue-with-Aggregation-tp5670608p5670608.html
Sent from the Camel - Users mailing list archive at Nabble.com.



--
Willem
----------------------------------
CamelOne 2012 Conference, May 15-16, 2012: http://camelone.com
FuseSource
Web: http://www.fusesource.com
Blog:    http://willemjiang.blogspot.com (English)
         http://jnn.javaeye.com (Chinese)
Twitter: willemjiang
Weibo: willemjiang

Reply via email to