@Claus I could be wrong but the composed message processor (splitter-only) doesn’t appear to solve it. I’m not actually interested in the final composed response. I’m only interested that the message was split, processed in batches.
My actual route will split a huge file (w/streaming, parallel processing), process each part, and save all parts to mongodb (in batches), before finally invoking other routes. The splitter-only approach would essentially aggregate the entire file in memory (won’t work). My original sample route appears to use the splitter+aggregator approach however as stated the timing doesn’t work after the split/end. On 11/4/16, 2:37 AM, "Claus Ibsen" <claus.ib...@gmail.com> wrote: See the composed message processor EIP with the splitter only http://camel.apache.org/composed-message-processor.html On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington <craig.washing...@aexp.com.invalid> wrote: > Hello, > I have a simple use case where I'd like to do the following: > * split a message and process each part > * aggregate parts into groups of size N for group-processing (w/timeout to ensure no parts are lost) > * continue route ONLY after all aggregated parts have completed > > The simplified route and output are as follows: > --- > > public class CamelSplitAggregateWaitForCompletion extends CamelTestSupport { > @Test > public void test() throws Exception { > template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE"); > Thread.sleep(3000); > } > > @Override > protected RoutesBuilder createRouteBuilder() throws Exception { > return new RouteBuilder() { > @Override > public void configure() throws Exception { > from("direct:start") > .split(body().tokenize(","), new UseLatestAggregationStrategy()) > .streaming() > //process each msg part > .log("processing the msg: ${body}") > //group by size > .aggregate(constant(true), new GroupedMessageAggregationStrategy()) > .completionSize(2).completionTimeout(2000) > //save grouped messages in batches > .log("saving msg group ${body}") > .to("mock:result") > //end the aggregate processing > .end() > //end the split processing > .end() > .log("*** COMPLETED split+aggregate processing"); > > //...do some more stuff here ONLY after all parts are complete... > } > }; > } > } > > --- > //output > 15:28:52.072 INFO route1 - processing the msg: AAA > 15:28:52.074 INFO route1 - processing the msg: BBB > 15:28:52.075 INFO route1 - saving msg group List<Exchange>(2 elements) > 15:28:52.076 INFO route1 - processing the msg: CCC > 15:28:52.077 INFO route1 - processing the msg: DDD > 15:28:52.077 INFO route1 - saving msg group List<Exchange>(2 elements) > 15:28:52.078 INFO route1 - processing the msg: EEE > 15:28:52.079 INFO route1 - *** COMPLETED split+aggregate processing > 15:28:55.064 INFO route1 - saving msg group List<Exchange>(1 elements) > --- > > Ideally, the "COMPLETED" line should print last (after the final aggregated group from timeout). > Seems simple enough though I haven't found a way to get this working. Neither of the completionTimeout examples I've found in source nor CIA focus on the route timing after the split. > > (I'm actually trying to process a large file with streaming and parallelProcessing so completionFromBatchConsumer() wouldn't work, though I think this part is irrelevant) > > Using Camel 2.17.3 > > Thanks > > > > American Express made the following annotations > ****************************************************************************** > "This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you." > > American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des > renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci. > > ****************************************************************************** -- Claus Ibsen ----------------- http://davsclaus.com @davsclaus Camel in Action 2: https://www.manning.com/ibsen2 American Express made the following annotations ****************************************************************************** "This message and any attachments are solely for the intended recipient and may contain confidential or privileged information. If you are not the intended recipient, any disclosure, copying, use, or distribution of the information included in this message and any attachments is prohibited. If you have received this communication in error, please notify us by reply e-mail and immediately and permanently delete this message and any attachments. Thank you." American Express a ajouté le commentaire suivant le Ce courrier et toute pièce jointe qu'il contient sont réservés au seul destinataire indiqué et peuvent renfermer des renseignements confidentiels et privilégiés. Si vous n'êtes pas le destinataire prévu, toute divulgation, duplication, utilisation ou distribution du courrier ou de toute pièce jointe est interdite. Si vous avez reçu cette communication par erreur, veuillez nous en aviser par courrier et détruire immédiatement le courrier et les pièces jointes. Merci. ******************************************************************************