Looking at my example code again, I am passing an Aggregator to the
Splitter, along with a completion predicate. The issue is that it appears
that when using an aggregator with a large number of split items, you
eventually run out of stack space.

On Sun, Dec 15, 2019 at 9:55 AM Jeremy Ross <jeremy.g.r...@gmail.com> wrote:

> My understanding is that if you pass an aggregator to the splitter that
> all split elements would be aggregated as one output message. My use case
> requires splits elements to be aggregated somewhat arbitrarily. E.g., if
> the splitter splits a message into 1000 elements, this will be aggregated
> into 20-40 "output" messages, depending on the content of the split items.
>
> On Sun, Dec 15, 2019 at 9:42 AM Mantas Gridinas <mgridi...@gmail.com>
> wrote:
>
>> Splitter already features an aggregation strategy as argument. By
>> default, it discards any modifications done to messages while
>> consuming them during splitter EIP. Are you sure you aren't looking
>> for that?
>>
>> On Sun, Dec 15, 2019 at 5:53 AM Jeremy Ross <jeremy.g.r...@gmail.com>
>> wrote:
>> >
>> > Hi,
>> >
>> > I have a use case that involves splitting a very large list, then
>> > aggregating it into smaller lists and processing those lists once they
>> > reach a certain size. I'm getting a StackOverflowError (good luck
>> googling
>> > 'Camel stackoverflow'), which makes me think I'm doing something wrong.
>> > Here's a route that reproduces the issue:
>> >
>> > from("direct:aggregateTest")
>> >     .setBody(() -> IntStream.rangeClosed(0, 50000)
>> >             .boxed().collect(Collectors.toList()))
>> >     .split().body().stopOnException()
>> >         .aggregate(constant(true), new AggregationStrategy() {
>> >             @Override
>> >             public Exchange aggregate(Exchange oldExchange, Exchange
>> > newExchange) {
>> >                 if (oldExchange == null) {
>> >                     newExchange.getIn().setBody(new ArrayList<>(List.of(
>> >
>>  newExchange.getIn().getBody(Integer.class))));
>> >                     return newExchange;
>> >                 }
>> >                 ((List<Integer>) oldExchange.getIn().getBody())
>> >
>>  .add(newExchange.getIn().getBody(Integer.class));
>> >                 return oldExchange;
>> >             }
>> >         }).completion(exchange ->
>> > exchange.getIn().getBody(List.class).size() == 10)
>> >             .log("aggregated") // doing something more interesting than
>> > logging here
>> >         .end() // end aggregate
>> >     .end(); // end split
>> >
>> > After about 680 aggregations, I get a StackOverflowError and one helluva
>> > stack trace. One other wrinkle is that I'm not simply splitting a big
>> list
>> > into smaller lists. There's some processing and conditional logic that
>> > means that many elements from the big list don't ever make it into a
>> > smaller list, thus the need for some interesting aggregation logic. Yet
>> > another wrinkle is that the smaller lists have to be no bigger than 25
>> > elements, so I can't just use the composed message processor pattern.
>> >
>> > Thanks for any help!
>> >
>> > Jeremy
>>
>

Reply via email to