On Tue, Mar 2, 2010 at 4:09 PM, Andrew Chandler <[email protected]> wrote: > We wound up resolving this by basically totally subverting a secondary > splitter - we split the first time to do the parallel work and then > route to a second custom splitter that maintains an internal map of > result objects - the map is based on the correlation key and the object > includes properties for totalsplit count for that correlation key, > results received so far, and isForwarded. Basically the first time a > correlation comes in a result is created into the map for the > correlationid. If any of the results indicate success we set the > isForwarded to true and return a message in our list of results > indicating the success, however we don't remove the correlation id from > the map until ALL responses are received. It's not as pretty as an > aggregator but it does seem to work for us. We've had to do that > elsewhere as well though and considering we seem to be using memory at a > staggering rate my worry is that we're passing around too many maps in > messages that are duplicated when split etc. Time to pull out the > profiler I guess :) >
Thanks for sharing your solution. Where do you store those Maps? I would assume the copy is cheap as its just a reference copy, and not a deep copy. > > > On Tue, 2010-03-02 at 07:03 +0100, Claus Ibsen wrote: > >> On Mon, Mar 1, 2010 at 8:24 PM, Andrew Chandler <[email protected]> wrote: >> > When does 2.3 come out - sounds like what I want, just I'm pretty sure >> > we can't update to something that isn't released yet or at least very >> > close to release >> > >> >> 2.2 was just recently released. I would think 2.3 is a couple of more >> months away. >> >> If you want supported and more often released version of Camel then I >> can only recommend taking a look at the FUSE versions. >> >> >> > >> > On Mon, 2010-03-01 at 19:51 +0100, Claus Ibsen wrote: >> > >> >> Hi >> >> >> >> Try with the new overhauled aggreagtor in 2.3 >> >> http://camel.apache.org/aggregator2.html >> >> >> >> It works bette with completion trigger. >> >> >> >> >> >> On Mon, Mar 1, 2010 at 5:40 PM, Andrew Chandler <[email protected]> wrote: >> >> > Hi there - with Clause help I've been able to get most of the way to >> >> > where I need to be. Right now I'm doing a proof of concept with string >> >> > payloads,however in the end the payload will be an object. Here's what >> >> > I'm attempting >> >> > >> >> > >> >> > I have an incoming message that contains an identifier as well as (N) >> >> > things to do against it. The (N) things can be done in parallel. So >> >> > what we are doing is splitting based on the (N) things. Here's where >> >> > it gets tricky. >> >> > - The first of the (N) things to report success should be sent on while >> >> > the rest of them should be aborted. We should then forward the success >> >> > on immediately not waiting for timeouts >> >> > - Further, in the event that none of them report success we should >> >> > aggregate until all (N) things have reported failure and then forward >> >> > that single negative result onward. >> >> > - As the (N) things inherently have timeouts built into them it would be >> >> > nice if I didn't have to deal with batchTimeout for the aggregator. >> >> > >> >> > >> >> > >> >> > What I'm seeing now with my prototype is that I can successfully spit >> >> > and process the split things using a threadPoolExecutor. I provided >> >> > to .aggregate(header("JMSCorrelationID"),new MyAggregationStrategy()) >> >> > >> >> > >> >> > Assume each of the split items have a built-in timeout on their work >> >> > effort of 5 seconds >> >> > With that result and without a .batchTimeout(7000L) I was seeing 2 >> >> > results from aggregate, - 1 almost immediately for the successful >> >> > result and then a second aggregated message that had all the falures >> >> > about 4.5 seconds later. When I tacked .batchTimeout(7000L) onto >> >> > the .aggregate clause though I got 1 single message that had the success >> >> > and the failures all in one. This is close, however I guess what >> >> > I'm asking is how can I control from inside the aggregation the decision >> >> > to move forward? In the splitter I'm already planning on including in >> >> > each split object a sharedobject that can be used to abort any of the >> >> > sibling split objects so I trhink I have a handle on that. >> >> > Basically the reason I need the aggregate mechanism to control the >> >> > continuing on part of the process is that if we're going after say >> >> > 60,000 things then the ability to start work on the successful ones >> >> > after 1/2 second instead of waiting 6 or 7 seconds for a batch timeout >> >> > is significant. But I still have to account for a totally negative >> >> > response in the event none of them are successful. >> >> > >> >> > I'm presently looking at creating my own AggregationCollection as it >> >> > seemed to allow me to figure out size of the aggregated collection and I >> >> > can somehow figure out the total number of items split versus how many >> >> > have been aggregated to determine I'm done. (I thought that info was >> >> > supposed to be in the header somewhere but it doesn't seem to be there) >> >> > >> >> > >> >> > Any insights or redirects are appreciated. >> >> > >> >> >> >> >> >> >> > >> > >> > >> >> >> > > > -- Claus Ibsen Apache Camel Committer Author of Camel in Action: http://www.manning.com/ibsen/ Open Source Integration: http://fusesource.com Blog: http://davsclaus.blogspot.com/ Twitter: http://twitter.com/davsclaus
