Well, turns out that the only reliable way of preventing the file
consumer thread from being assigned tasks is to set the queue depth of
the executor pool to "unlimited" (maxQueueSize="0") and use the
throttling route policy and sufficiently low maxMessagesPerPoll at the
file endpoint to manage the queue depth.

Ralf



-----Original Message-----
From: Ralf Steppacher <[email protected]>
Reply-to: [email protected]
To: [email protected]
Subject: Re: File and async/<camel:threads> woes
Date: Tue, 25 Sep 2012 19:27:17 +0200

Claus,

I finally implemented your suggestion with the route policy and was able
to work around the problem of the caller thread getting blocked by a
large task. Indeed the throttling affects the number of files being
processed, not the number of exchanges created by the splitter.
In my case, if the number of threads in the pool is about double the
suspend threshold I can prevent the caller thread from being blocked.
However, if the pattern of file sizes changes, this might not be true
anymore.

I also realized that I have to set useFixedDelay=false on the file
endpoint to allow for continuous feeding of files for async processing.
The default of useFixedDelay=true also leads to large files blocking
processing of pending files (by way of preventing the file endpoint from
polling).
I double checked that this was not my root problem all along; but it is
not. Without the throttling route policy and with useFixedDelay=false
processing still gets stuck on large files.

All the above I have tested with Camel 2.9.2. Due to time constraints I
have not yet given 2.10.x another shot with this.


Ralf


-----Original Message-----
From: Claus Ibsen <[email protected]>
Reply-to: [email protected]
To: [email protected]
Subject: Re: File and async/<camel:threads> woes
Date: Sun, 9 Sep 2012 10:33:45 +0200

Hi

You can use route policy to control the route consumer to
suspend/resume, depending on number of in flight exchanges for the
route.
http://camel.apache.org/routepolicy

Then you can suspend when reaching 5, and resume when back to 4 or lower.
There is a ThrottlingInflightRoutePolicy for that. Though its
watermark is percent based. Also consider setting maxMessagesPerPoll=1
on the file consumer endpoint, so it only grabs 1 filer at a time.

About the thread pools. Take a closer look at the difference between
Abort and Reject. You would need to use the one  that ensures a
rollback, when the thread pool is full. So the preMove file gets
rolled back (assuming that logic works).




On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher
<[email protected]> wrote:
> Hello Claus,
>
> If I want 5 files to be processed in parallel line by line, every line
> taken care of asynchronously, don't I need the
> <threads ...><camel:split ...></camel:split></threads> construct?
> If threads and split refer to the same thread pool, then of course I
> would not have to worry about finding the sweet spot in terms of number
> of threads in the two pools; to keep all threads in split-pool as busy
> as possible and have no thread in the threads-pool waiting. I prefer to
> keep them apart because that way I have more control about the number
> and order (small files first) in which they are processed.
>
> Using one thread pool would not solve my problem of the rejectedPolicy
> not being honored, would it? This really kills me. I am processing files
> ranging from 1kb to 200mb, sorted by their size. If the caller thread
> ends up with the 200mb file, all other processing stalls until the big
> file has been processed. I need to prevent processing of small files
> being blocked by large files.
>
> Is the file consumer supposed to preMove all files from the input
> directory to the inprogress directory (2.10.0 behavior) or just the ones
> it is actually going to process, i.e. number of threads (2.9.2
> behavior)?
>
> With rejectedPolicy="Abort":
> If the 2.10.0 preMove behavior is the expected behavior, is it expected
> that all files that could not be processed on the first poll of the file
> endpoint (file number > number of threads) stay in the inprogress
> directory for ever, never being processed (2.10.0 behavior) or should
> they be picked up as soon as a thread becomes free (2.9.2 behavior with
> respective preMove behavior) and the file endpoint polls again?
>
>
> Thanks!
> Ralf
>
>
> -----Original Message-----
> From: Claus Ibsen <[email protected]>
> Reply-to: [email protected]
> To: [email protected]
> Subject: Re: File and async/<camel:threads> woes
> Date: Thu, 6 Sep 2012 11:30:29 +0200
>
> Hi
>
> Why do it so complicated with 2 thread pools. The splitter can just
> refer to a custom thread pool profile / thread pool which you can
> customize as you want.
>
> Also the fine consumer with preMove will move the file asap, it starts 
> routing.
>
>
> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher
> <[email protected]> wrote:
>> Hallo all,
>>
>> I have several issues defining an async route in Spring XML.
>> Can someone shed some light on what the expected behavior is and what
>> would be a bug and/or a misunderstanding on my side?
>>
>> The stripped down route:
>>
>> <endpoint id="fileBufferFrom"
>> uri="file://${my.dir}/?delete=true&amp;idempotent=true&amp;preMove=inprogress"
>>  />
>>
>> <camel:route id="processFromFileBuffer">
>>   <camel:from ref="fileBufferFrom" />
>>   <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1"
>> threadName="file consumer" rejectedPolicy="Abort">
>>     <camel:convertBodyTo type="java.io.InputStream" />
>>     <camel:split streaming="true" parallelProcessing="true">
>>         <camel:tokenize token="\r\n" />
>>         <camel:to ref="mq.csv" />
>>     </camel:split>
>>   </camel:threads>
>> </camel:route>
>>
>>
>> Camel 2.9.2
>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from
>> the in-folder to the inprogress-folder. If there are more files they are
>> moved to the inprogress-folder as soon as the caller thread is free to
>> move them and worker threads are available.
>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems
>> to get ignored and the reject policy "CallerRuns" is applied. The source
>> is a file endpoint, so I assume the thread with the scanned directory as
>> its name is the caller thread? I can see that thread come to live in
>> VisualVM.
>>
>> If maxQueueSize = 0 then all files present in the input directory are
>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is
>> honored! There are always maxPoolSize files processed in parallel.
>>
>>
>> Camel 2.10.0
>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then
>> the policy is honored, sort of. All files present in the input directory
>> are always moved to the inprogress-folder. Only maxPoolSiz +
>> maxQueueSize files are being processed. All others stay untouched in the
>> "inprogress" folder!
>>
>> Using <camel:threadPool> and referencing that in <camel:threads> does
>> not change the above behavior for 2.9.2 or 2.10.0.
>>
>>
>> The behavior I expected was that with rejectPolicy="Abort" always
>> maxPoolSize + maxQueueSize are moved from the in-folder to the
>> inprogress-folder and processed from there. As processing of one file
>> completes it gets deleted and a file from the in-folder is moved to the
>> inprogress-folder on the next poll of the in-folder.
>>
>>
>> Thanks!
>> Ralf
>
>
>



Reply via email to