Hi

You can use route policy to control the route consumer to
suspend/resume, depending on number of in flight exchanges for the
route.
http://camel.apache.org/routepolicy

Then you can suspend when reaching 5, and resume when back to 4 or lower.
There is a ThrottlingInflightRoutePolicy for that. Though its
watermark is percent based. Also consider setting maxMessagesPerPoll=1
on the file consumer endpoint, so it only grabs 1 filer at a time.

About the thread pools. Take a closer look at the difference between
Abort and Reject. You would need to use the one  that ensures a
rollback, when the thread pool is full. So the preMove file gets
rolled back (assuming that logic works).




On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher
<[email protected]> wrote:
> Hello Claus,
>
> If I want 5 files to be processed in parallel line by line, every line
> taken care of asynchronously, don't I need the
> <threads ...><camel:split ...></camel:split></threads> construct?
> If threads and split refer to the same thread pool, then of course I
> would not have to worry about finding the sweet spot in terms of number
> of threads in the two pools; to keep all threads in split-pool as busy
> as possible and have no thread in the threads-pool waiting. I prefer to
> keep them apart because that way I have more control about the number
> and order (small files first) in which they are processed.
>
> Using one thread pool would not solve my problem of the rejectedPolicy
> not being honored, would it? This really kills me. I am processing files
> ranging from 1kb to 200mb, sorted by their size. If the caller thread
> ends up with the 200mb file, all other processing stalls until the big
> file has been processed. I need to prevent processing of small files
> being blocked by large files.
>
> Is the file consumer supposed to preMove all files from the input
> directory to the inprogress directory (2.10.0 behavior) or just the ones
> it is actually going to process, i.e. number of threads (2.9.2
> behavior)?
>
> With rejectedPolicy="Abort":
> If the 2.10.0 preMove behavior is the expected behavior, is it expected
> that all files that could not be processed on the first poll of the file
> endpoint (file number > number of threads) stay in the inprogress
> directory for ever, never being processed (2.10.0 behavior) or should
> they be picked up as soon as a thread becomes free (2.9.2 behavior with
> respective preMove behavior) and the file endpoint polls again?
>
>
> Thanks!
> Ralf
>
>
> -----Original Message-----
> From: Claus Ibsen <[email protected]>
> Reply-to: [email protected]
> To: [email protected]
> Subject: Re: File and async/<camel:threads> woes
> Date: Thu, 6 Sep 2012 11:30:29 +0200
>
> Hi
>
> Why do it so complicated with 2 thread pools. The splitter can just
> refer to a custom thread pool profile / thread pool which you can
> customize as you want.
>
> Also the fine consumer with preMove will move the file asap, it starts 
> routing.
>
>
> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
> <[email protected]> wrote:
>> Hallo all,
>>
>> I have several issues defining an async route in Spring XML.
>> Can someone shed some light on what the expected behavior is and what
>> would be a bug and/or a misunderstanding on my side?
>>
>> The stripped down route:
>>
>> <endpoint id="fileBufferFrom"
>> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress"
>>  />
>>
>> <camel:route id="processFromFileBuffer">
>>   <camel:from ref="fileBufferFrom" />
>>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
>> threadName="file consumer" rejectedPolicy="Abort">
>>     <camel:convertBodyTo type="java.io.InputStream" />
>>     <camel:split streaming="true" parallelProcessing="true">
>>         <camel:tokenize token="\r\n" />
>>         <camel:to ref="mq.csv" />
>>     </camel:split>
>>   </camel:threads>
>> </camel:route>
>>
>>
>> Camel 2.9.2
>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
>> the in-folder to the inprogress-folder. If there are more files they are
>> moved to the inprogress-folder as soon as the caller thread is free to
>> move them and worker threads are available.
>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
>> to get ignored and the reject policy "CallerRuns" is applied. The source
>> is a file endpoint, so I assume the thread with the scanned directory as
>> its name is the caller thread? I can see that thread come to live in
>> VisualVM.
>>
>> If maxQueueSize = 0 then all files present in the input directory are
>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
>> honored! There are always maxPoolSize files processed in parallel.
>>
>>
>> Camel 2.10.0
>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
>> the policy is honored, sort of. All files present in the input directory
>> are always moved to the inprogress-folder. Only maxPoolSiz +
>> maxQueueSize files are being processed. All others stay untouched in the
>> "inprogress" folder!
>>
>> Using <camel:threadPool> and referencing that in <camel:threads> does
>> not change the above behavior for 2.9.2 or 2.10.0.
>>
>>
>> The behavior I expected was that with rejectPolicy="Abort" always
>> maxPoolSize + maxQueueSize are moved from the in-folder to the
>> inprogress-folder and processed from there. As processing of one file
>> completes it gets deleted and a file from the in-folder is moved to the
>> inprogress-folder on the next poll of the in-folder.
>>
>>
>> Thanks!
>> Ralf
>
>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: [email protected]
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Reply via email to