Hi You can use route policy to control the route consumer to suspend/resume, depending on number of in flight exchanges for the route. http://camel.apache.org/routepolicy
Then you can suspend when reaching 5, and resume when back to 4 or lower. There is a ThrottlingInflightRoutePolicy for that. Though its watermark is percent based. Also consider setting maxMessagesPerPoll=1 on the file consumer endpoint, so it only grabs 1 filer at a time. About the thread pools. Take a closer look at the difference between Abort and Reject. You would need to use the one that ensures a rollback, when the thread pool is full. So the preMove file gets rolled back (assuming that logic works). On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher <[email protected]> wrote: > Hello Claus, > > If I want 5 files to be processed in parallel line by line, every line > taken care of asynchronously, don't I need the > <threads ...><camel:split ...></camel:split></threads> construct? > If threads and split refer to the same thread pool, then of course I > would not have to worry about finding the sweet spot in terms of number > of threads in the two pools; to keep all threads in split-pool as busy > as possible and have no thread in the threads-pool waiting. I prefer to > keep them apart because that way I have more control about the number > and order (small files first) in which they are processed. > > Using one thread pool would not solve my problem of the rejectedPolicy > not being honored, would it? This really kills me. I am processing files > ranging from 1kb to 200mb, sorted by their size. If the caller thread > ends up with the 200mb file, all other processing stalls until the big > file has been processed. I need to prevent processing of small files > being blocked by large files. > > Is the file consumer supposed to preMove all files from the input > directory to the inprogress directory (2.10.0 behavior) or just the ones > it is actually going to process, i.e. number of threads (2.9.2 > behavior)? > > With rejectedPolicy="Abort": > If the 2.10.0 preMove behavior is the expected behavior, is it expected > that all files that could not be processed on the first poll of the file > endpoint (file number > number of threads) stay in the inprogress > directory for ever, never being processed (2.10.0 behavior) or should > they be picked up as soon as a thread becomes free (2.9.2 behavior with > respective preMove behavior) and the file endpoint polls again? > > > Thanks! > Ralf > > > -----Original Message----- > From: Claus Ibsen <[email protected]> > Reply-to: [email protected] > To: [email protected] > Subject: Re: File and async/<camel:threads> woes > Date: Thu, 6 Sep 2012 11:30:29 +0200 > > Hi > > Why do it so complicated with 2 thread pools. The splitter can just > refer to a custom thread pool profile / thread pool which you can > customize as you want. > > Also the fine consumer with preMove will move the file asap, it starts > routing. > > > On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher > <[email protected]> wrote: >> Hallo all, >> >> I have several issues defining an async route in Spring XML. >> Can someone shed some light on what the expected behavior is and what >> would be a bug and/or a misunderstanding on my side? >> >> The stripped down route: >> >> <endpoint id="fileBufferFrom" >> uri="file://${my.dir}/?delete=true&idempotent=true&preMove=inprogress" >> /> >> >> <camel:route id="processFromFileBuffer"> >> <camel:from ref="fileBufferFrom" /> >> <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1" >> threadName="file consumer" rejectedPolicy="Abort"> >> <camel:convertBodyTo type="java.io.InputStream" /> >> <camel:split streaming="true" parallelProcessing="true"> >> <camel:tokenize token="\r\n" /> >> <camel:to ref="mq.csv" /> >> </camel:split> >> </camel:threads> >> </camel:route> >> >> >> Camel 2.9.2 >> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from >> the in-folder to the inprogress-folder. If there are more files they are >> moved to the inprogress-folder as soon as the caller thread is free to >> move them and worker threads are available. >> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems >> to get ignored and the reject policy "CallerRuns" is applied. The source >> is a file endpoint, so I assume the thread with the scanned directory as >> its name is the caller thread? I can see that thread come to live in >> VisualVM. >> >> If maxQueueSize = 0 then all files present in the input directory are >> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is >> honored! There are always maxPoolSize files processed in parallel. >> >> >> Camel 2.10.0 >> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then >> the policy is honored, sort of. All files present in the input directory >> are always moved to the inprogress-folder. Only maxPoolSiz + >> maxQueueSize files are being processed. All others stay untouched in the >> "inprogress" folder! >> >> Using <camel:threadPool> and referencing that in <camel:threads> does >> not change the above behavior for 2.9.2 or 2.10.0. >> >> >> The behavior I expected was that with rejectPolicy="Abort" always >> maxPoolSize + maxQueueSize are moved from the in-folder to the >> inprogress-folder and processed from there. As processing of one file >> completes it gets deleted and a file from the in-folder is moved to the >> inprogress-folder on the next poll of the in-folder. >> >> >> Thanks! >> Ralf > > > -- Claus Ibsen ----------------- FuseSource Email: [email protected] Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen
