Hi Thanks for sharing your findings.
On Wed, Sep 26, 2012 at 7:47 PM, Ralf Steppacher <[email protected]> wrote: > Well, turns out that the only reliable way of preventing the file > consumer thread from being assigned tasks is to set the queue depth of > the executor pool to "unlimited" (maxQueueSize="0") and use the > throttling route policy and sufficiently low maxMessagesPerPoll at the > file endpoint to manage the queue depth. > > Ralf > > > > -----Original Message----- > From: Ralf Steppacher <[email protected]> > Reply-to: [email protected] > To: [email protected] > Subject: Re: File and async/<camel:threads> woes > Date: Tue, 25 Sep 2012 19:27:17 +0200 > > Claus, > > I finally implemented your suggestion with the route policy and was able > to work around the problem of the caller thread getting blocked by a > large task. Indeed the throttling affects the number of files being > processed, not the number of exchanges created by the splitter. > In my case, if the number of threads in the pool is about double the > suspend threshold I can prevent the caller thread from being blocked. > However, if the pattern of file sizes changes, this might not be true > anymore. > > I also realized that I have to set useFixedDelay=false on the file > endpoint to allow for continuous feeding of files for async processing. > The default of useFixedDelay=true also leads to large files blocking > processing of pending files (by way of preventing the file endpoint from > polling). > I double checked that this was not my root problem all along; but it is > not. Without the throttling route policy and with useFixedDelay=false > processing still gets stuck on large files. > > All the above I have tested with Camel 2.9.2. Due to time constraints I > have not yet given 2.10.x another shot with this. > > > Ralf > > > -----Original Message----- > From: Claus Ibsen <[email protected]> > Reply-to: [email protected] > To: [email protected] > Subject: Re: File and async/<camel:threads> woes > Date: Sun, 9 Sep 2012 10:33:45 +0200 > > Hi > > You can use route policy to control the route consumer to > suspend/resume, depending on number of in flight exchanges for the > route. > http://camel.apache.org/routepolicy > > Then you can suspend when reaching 5, and resume when back to 4 or lower. > There is a ThrottlingInflightRoutePolicy for that. Though its > watermark is percent based. Also consider setting maxMessagesPerPoll=1 > on the file consumer endpoint, so it only grabs 1 filer at a time. > > About the thread pools. Take a closer look at the difference between > Abort and Reject. You would need to use the one that ensures a > rollback, when the thread pool is full. So the preMove file gets > rolled back (assuming that logic works). > > > > > On Fri, Sep 7, 2012 at 3:54 PM, Ralf Steppacher > <[email protected]> wrote: >> Hello Claus, >> >> If I want 5 files to be processed in parallel line by line, every line >> taken care of asynchronously, don't I need the >> <threads ...><camel:split ...></camel:split></threads> construct? >> If threads and split refer to the same thread pool, then of course I >> would not have to worry about finding the sweet spot in terms of number >> of threads in the two pools; to keep all threads in split-pool as busy >> as possible and have no thread in the threads-pool waiting. I prefer to >> keep them apart because that way I have more control about the number >> and order (small files first) in which they are processed. >> >> Using one thread pool would not solve my problem of the rejectedPolicy >> not being honored, would it? This really kills me. I am processing files >> ranging from 1kb to 200mb, sorted by their size. If the caller thread >> ends up with the 200mb file, all other processing stalls until the big >> file has been processed. I need to prevent processing of small files >> being blocked by large files. >> >> Is the file consumer supposed to preMove all files from the input >> directory to the inprogress directory (2.10.0 behavior) or just the ones >> it is actually going to process, i.e. number of threads (2.9.2 >> behavior)? >> >> With rejectedPolicy="Abort": >> If the 2.10.0 preMove behavior is the expected behavior, is it expected >> that all files that could not be processed on the first poll of the file >> endpoint (file number > number of threads) stay in the inprogress >> directory for ever, never being processed (2.10.0 behavior) or should >> they be picked up as soon as a thread becomes free (2.9.2 behavior with >> respective preMove behavior) and the file endpoint polls again? >> >> >> Thanks! >> Ralf >> >> >> -----Original Message----- >> From: Claus Ibsen <[email protected]> >> Reply-to: [email protected] >> To: [email protected] >> Subject: Re: File and async/<camel:threads> woes >> Date: Thu, 6 Sep 2012 11:30:29 +0200 >> >> Hi >> >> Why do it so complicated with 2 thread pools. The splitter can just >> refer to a custom thread pool profile / thread pool which you can >> customize as you want. >> >> Also the fine consumer with preMove will move the file asap, it starts >> routing. >> >> >> On Thu, Aug 30, 2012 at 5:24 PM, Ralf Steppacher >> <[email protected]> wrote: >>> Hallo all, >>> >>> I have several issues defining an async route in Spring XML. >>> Can someone shed some light on what the expected behavior is and what >>> would be a bug and/or a misunderstanding on my side? >>> >>> The stripped down route: >>> >>> <endpoint id="fileBufferFrom" >>> uri="file://${my.dir}/?delete=true&idempotent=true&preMove=inprogress" >>> /> >>> >>> <camel:route id="processFromFileBuffer"> >>> <camel:from ref="fileBufferFrom" /> >>> <camel:threads poolSize="5" maxPoolSize="5" maxQueueSize="1" >>> threadName="file consumer" rejectedPolicy="Abort"> >>> <camel:convertBodyTo type="java.io.InputStream" /> >>> <camel:split streaming="true" parallelProcessing="true"> >>> <camel:tokenize token="\r\n" /> >>> <camel:to ref="mq.csv" /> >>> </camel:split> >>> </camel:threads> >>> </camel:route> >>> >>> >>> Camel 2.9.2 >>> If maxQueueSize > 0 then maxPoolSize + maxQueueSize files are moved from >>> the in-folder to the inprogress-folder. If there are more files they are >>> moved to the inprogress-folder as soon as the caller thread is free to >>> move them and worker threads are available. >>> Setting rejectedPolicy="Abort" or callerRunsWhenRejected="false" seems >>> to get ignored and the reject policy "CallerRuns" is applied. The source >>> is a file endpoint, so I assume the thread with the scanned directory as >>> its name is the caller thread? I can see that thread come to live in >>> VisualVM. >>> >>> If maxQueueSize = 0 then all files present in the input directory are >>> always moved to the inprogress-folder AND the rejectedPolicy="Abort" is >>> honored! There are always maxPoolSize files processed in parallel. >>> >>> >>> Camel 2.10.0 >>> Independent of the value of maxQueueSize, if rejectPolicy="Abort" then >>> the policy is honored, sort of. All files present in the input directory >>> are always moved to the inprogress-folder. Only maxPoolSiz + >>> maxQueueSize files are being processed. All others stay untouched in the >>> "inprogress" folder! >>> >>> Using <camel:threadPool> and referencing that in <camel:threads> does >>> not change the above behavior for 2.9.2 or 2.10.0. >>> >>> >>> The behavior I expected was that with rejectPolicy="Abort" always >>> maxPoolSize + maxQueueSize are moved from the in-folder to the >>> inprogress-folder and processed from there. As processing of one file >>> completes it gets deleted and a file from the in-folder is moved to the >>> inprogress-folder on the next poll of the in-folder. >>> >>> >>> Thanks! >>> Ralf >> >> >> > > > -- Claus Ibsen ----------------- Red Hat, Inc. FuseSource is now part of Red Hat Email: [email protected] Web: http://fusesource.com Twitter: davsclaus Blog: http://davsclaus.com Author of Camel in Action: http://www.manning.com/ibsen
