Looking at my example code again, I am passing an Aggregator to the Splitter, along with a completion predicate. The issue is that it appears that when using an aggregator with a large number of split items, you eventually run out of stack space.
On Sun, Dec 15, 2019 at 9:55 AM Jeremy Ross <jeremy.g.r...@gmail.com> wrote: > My understanding is that if you pass an aggregator to the splitter that > all split elements would be aggregated as one output message. My use case > requires splits elements to be aggregated somewhat arbitrarily. E.g., if > the splitter splits a message into 1000 elements, this will be aggregated > into 20-40 "output" messages, depending on the content of the split items. > > On Sun, Dec 15, 2019 at 9:42 AM Mantas Gridinas <mgridi...@gmail.com> > wrote: > >> Splitter already features an aggregation strategy as argument. By >> default, it discards any modifications done to messages while >> consuming them during splitter EIP. Are you sure you aren't looking >> for that? >> >> On Sun, Dec 15, 2019 at 5:53 AM Jeremy Ross <jeremy.g.r...@gmail.com> >> wrote: >> > >> > Hi, >> > >> > I have a use case that involves splitting a very large list, then >> > aggregating it into smaller lists and processing those lists once they >> > reach a certain size. I'm getting a StackOverflowError (good luck >> googling >> > 'Camel stackoverflow'), which makes me think I'm doing something wrong. >> > Here's a route that reproduces the issue: >> > >> > from("direct:aggregateTest") >> > .setBody(() -> IntStream.rangeClosed(0, 50000) >> > .boxed().collect(Collectors.toList())) >> > .split().body().stopOnException() >> > .aggregate(constant(true), new AggregationStrategy() { >> > @Override >> > public Exchange aggregate(Exchange oldExchange, Exchange >> > newExchange) { >> > if (oldExchange == null) { >> > newExchange.getIn().setBody(new ArrayList<>(List.of( >> > >> newExchange.getIn().getBody(Integer.class)))); >> > return newExchange; >> > } >> > ((List<Integer>) oldExchange.getIn().getBody()) >> > >> .add(newExchange.getIn().getBody(Integer.class)); >> > return oldExchange; >> > } >> > }).completion(exchange -> >> > exchange.getIn().getBody(List.class).size() == 10) >> > .log("aggregated") // doing something more interesting than >> > logging here >> > .end() // end aggregate >> > .end(); // end split >> > >> > After about 680 aggregations, I get a StackOverflowError and one helluva >> > stack trace. One other wrinkle is that I'm not simply splitting a big >> list >> > into smaller lists. There's some processing and conditional logic that >> > means that many elements from the big list don't ever make it into a >> > smaller list, thus the need for some interesting aggregation logic. Yet >> > another wrinkle is that the smaller lists have to be no bigger than 25 >> > elements, so I can't just use the composed message processor pattern. >> > >> > Thanks for any help! >> > >> > Jeremy >> >