To me it looks like the entire CSV file is converted to activemq messages which finally causes the OOM. The same problem exists with Camel SEDA queues as the messages need to be kept in memory until handled. I guess you need to set a queue size limit which blocks your csv sending process if no space is left.

see: http://activemq.apache.org/producer-flow-control.html

Jens


Am 29/03/16 um 10:12 schrieb Michele:
Hi,

thanks a lot for your reply.

I changed my route introducing  Throttler Pattern like this:

<route id="FileReader_Route">
     <from uri="file:incoming?....." />
      <split streaming="true" parallelProcessing="true">
            <tokenize token="\n" />
           <unmarshal ref="IncomingCSVFileDataFormat" />
            .....
            <to uri="activemq:queue:incomingTickets" />
      </split>
</route>

<route id="ProcessTicket_Route">
      <from uri="activemq:queue:incomingTickets" />
      <throttle timePeriodMillis="10000" asyncDelayed="true">
          <constant>5</constant>
           <process ref="DataProcessor" />
            <marshal ref="Gson" />
            <to
uri="jetty:http://host/rs/v1.0/ticket?jettyHttpBindingRef=CustomJettyHttpBinding";
/>
      </throttle>
</route>

But now, I have an error java.lang.OutOfMemoryError: Java heap space on
ActiveMQ... I defined a PooledConnectionFactory with concurrentConsumers
equals 5 and maxConnections equals 3

.....
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor
Elapsed (ms)
[FileRetriever_Rout] [FileRetriever_Rout]
[file://C:/inbox?include=%5EEdenred_%5B0-9%5D%7B8%7D.csv&s] [     46034]
[FileRetriever_Rout] [unmarshal3        ]
[unmarshal[ref:IncomingFileDataFormat]
] [         0]
[FileRetriever_Rout] [setHeader17       ] [setHeader[CamelSplitIndex]
] [         0]
[FileRetriever_Rout] [setBody1          ] [setBody[simple{${body[0]}}]
] [        10]
[FileRetriever_Rout] [choice2           ]
[when[simple{${body[INGENICO_OPERATION_ID]} regex '[0-9]+'}]choice[]
] [      1634]
[FileRetriever_Rout] [log15             ] [log
] [         0]
[FileRetriever_Rout] [to6               ]
[activemq:queue:IF_INGESTATE_Inbound?destination.consumer.dispatchAsync=true&de]
[      1634]

Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
        Id                  ID-FGBAL201530-50191-1459236743101-1-259
        ExchangePattern     InOnly
        Headers             {breadcrumbId=IMP-IF-Ingestate-20160329-093400,
CamelFileAbsolute=true,
CamelFileAbsolutePath=C:\CRT-2.0\IF-Ingestate\inbox\Edenred_15092015.csv,
CamelFileContentType=application/vnd.ms-excel,
CamelFileLastModified=1458774180380, CamelFileLength=6961789,
CamelFileName=Edenred_15092015.csv,
CamelFileNameConsumed=Edenred_15092015.csv,
CamelFileNameOnly=Edenred_15092015.csv,
CamelFileParent=C:\CRT-2.0\IF-Ingestate\inbox,
CamelFilePath=C:\CRT-2.0\IF-Ingestate\inbox\Edenred_15092015.csv,
CamelFileRelativePath=Edenred_15092015.csv, CamelRedelivered=false,
CamelRedeliveryCounter=0, CamelSplitIndex=59,
CrmRSActionPath=/tk_rt_ticket/ingestate/maintenance,
ImportDateTime=20160329-093400,
MsgCorrelationId=Inbound_INGESTATE_20160329-093400}
        BodyType            java.util.HashMap
        Body                {TICKET_ID=00108560, INGENICO_OPERATION_ID=1721710,
TERMINAL_NUMBEROFCONTACTS=1, TICKET_STATUS=1,
TERMINAL_LAST_CONTACT_DATE=2012-11-03 01:57:26.360,
TERMINAL_TECHNOLOGY=TELIUM, TERMINAL_SN=0000107370643471,
TERMINAL_INGESTATE_ID=ICT-TICKET-RESTAURANT:91215343,
TICKET_REGISTRATION_DATE=2012-11-02 10:08:00.000, TERMINAL_PN=M40,
TERMINAL_FIRST_CONTACT_DATE=2012-11-03 01:56:08.343, TERMINAL_ID=91215343,
RECORDER=EDENRED}
]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.CamelExecutionException: Exception occurred during
execution on the exchange: Exchange[Message: {TICKET_ID=00108560,
INGENICO_OPERATION_ID=1721710, TERMINAL_NUMBEROFCONTACTS=1, TICKET_STATUS=1,
TERMINAL_LAST_CONTACT_DATE=2012-11-03 01:57:26.360,
TERMINAL_TECHNOLOGY=TELIUM, TERMINAL_SN=0000107370643471,
TERMINAL_INGESTATE_ID=ICT-TICKET-RESTAURANT:91215343,
TICKET_REGISTRATION_DATE=2012-11-02 10:08:00.000, TERMINAL_PN=M40,
TERMINAL_FIRST_CONTACT_DATE=2012-11-03 01:56:08.343, TERMINAL_ID=91215343,
RECORDER=EDENRED}]
        at
org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1635)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:308)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:158)[209:org.apache.camel.camel-jms:2.15.1.redhat-620133]
        at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:129)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:652)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:580)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:227)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.Splitter.process(Splitter.java:104)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:448)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:435)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:211)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:175)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101)[198:org.apache.camel.camel-core:2.15.1.redhat-620133]
        at
org.apache.camel.pollconsumer.quartz2.QuartzScheduledPollConsumerJob.execute(QuartzScheduledPollConsumerJob.java:59)[279:org.apache.camel.camel-quartz2:2.15.1.redhat-620133]
        at
org.quartz.core.JobRunShell.run(JobRunShell.java:202)[278:org.quartz-scheduler.quartz:2.2.1]
        at
org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)[278:org.quartz-scheduler.quartz:2.2.1]
Caused by: java.lang.OutOfMemoryError: Java heap space


Any idea?

How configure active mq component in order to manage the load in JBoss Fuse
6.2 context? Or Is there something wrong in my system routing?

Thanks in advance

Best Regards
Michele








--
View this message in context: 
http://camel.465427.n5.nabble.com/Best-Strategy-to-process-a-large-number-of-rows-in-File-tp5779856p5779935.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to