It appears that the splitter is exiting while your aggregator is still stuck waiting for the time out if I'm groking the code right.
On Nov 4, 2016 3:49 PM, "Craig Washington" <craig.washing...@aexp.com.invalid> wrote: > Thanks Brad. > Just now taking a look at the resequencer. In my case the final message > and ordering > aren’t important, only the timing of the split/group/completion. > > The resequncer may work, likewise I suppose I could get away with using > delay() and > some sufficiently large value after the split, there just seems to be a > better way that > I’m missing. > > On 11/4/16, 8:54 AM, "Brad Johnson" <brad.john...@mediadriver.com> wrote: > > @Craig, > > This may or may not be appropriate to your situation depending on your > situation, but I recently ran into a situation where I was parallel > processing and making external REST calls to another company so one > couldn't count on uniform transaction speed and an exception absolutely > bogged a thread down. One example of the speed differential was the > footer > that came in the input file had to be written out last and it required > no > processing so it zipped through. > > What I ended up doing is right after my splitter I added a header with > an > incremented value so I'd know what order they came in. Then I'd drop it > into a SEDA queue with multiple threads for consumers. Right before I > would > write it to the output file, I put in a resequencer with a time out. > That > time out value was of sufficient length to ensure that values on the > resequence queue would get shuffled to correct order before getting > output. > > The only item I wished were different about the resequencer is a reset > on > the timeout of an item if another item came in with a lower number. > If the > footer was number 20 and 4 seconds later number 18 came in, I'd prefer > number 20's timeout to be reset to help ensure that differences in > order be > naturally taken care of. > > Anyway, your aggregation strategy could assign an order number and > before > writing it out you drop it on the resequencer queue so if another one > with > a lower number comes in after it they get output in their natural > order. > Someday if I get some time I'd like to modify the resequencer to have > that > behavior, if not by default, then at least settable via a flag. I > can't > really think of a case where I wouldn't want the timer reset when a > resequence on order of messages occurred. After all, the purpose is to > ensure the ordering is correct independent of the timing. > > Like I said, that may or may not help in your situation. I don't know > if > I've used the composed message EIP that Claus mentions so can't really > comment. > > On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <claus.ib...@gmail.com> > wrote: > > > See the composed message processor EIP with the splitter only > > http://camel.apache.org/composed-message-processor.html > > > > > > On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington > > <craig.washing...@aexp.com.invalid> wrote: > > > Hello, > > > I have a simple use case where I'd like to do the following: > > > * split a message and process each part > > > * aggregate parts into groups of size N for group-processing > (w/timeout > > to ensure no parts are lost) > > > * continue route ONLY after all aggregated parts have completed > > > > > > The simplified route and output are as follows: > > > --- > > > > > > public class CamelSplitAggregateWaitForCompletion extends > > CamelTestSupport { > > > @Test > > > public void test() throws Exception { > > > template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE"); > > > Thread.sleep(3000); > > > } > > > > > > @Override > > > protected RoutesBuilder createRouteBuilder() throws Exception { > > > return new RouteBuilder() { > > > @Override > > > public void configure() throws Exception { > > > from("direct:start") > > > .split(body().tokenize(","), new > > UseLatestAggregationStrategy()) > > > .streaming() > > > //process each msg part > > > .log("processing the msg: ${body}") > > > //group by size > > > .aggregate(constant(true), new > > GroupedMessageAggregationStrategy()) > > > .completionSize(2).completionTimeout(2000) > > > //save grouped messages in batches > > > .log("saving msg group ${body}") > > > .to("mock:result") > > > //end the aggregate processing > > > .end() > > > //end the split processing > > > .end() > > > .log("*** COMPLETED split+aggregate > processing"); > > > > > > //...do some more stuff here ONLY after all > parts > > are complete... > > > } > > > }; > > > } > > > } > > > > > > --- > > > //output > > > 15:28:52.072 INFO route1 - processing the msg: AAA > > > 15:28:52.074 INFO route1 - processing the msg: BBB > > > 15:28:52.075 INFO route1 - saving msg group List<Exchange>(2 > elements) > > > 15:28:52.076 INFO route1 - processing the msg: CCC > > > 15:28:52.077 INFO route1 - processing the msg: DDD > > > 15:28:52.077 INFO route1 - saving msg group List<Exchange>(2 > elements) > > > 15:28:52.078 INFO route1 - processing the msg: EEE > > > 15:28:52.079 INFO route1 - *** COMPLETED split+aggregate > processing > > > 15:28:55.064 INFO route1 - saving msg group List<Exchange>(1 > elements) > > > --- > > > > > > Ideally, the "COMPLETED" line should print last (after the final > > aggregated group from timeout). > > > Seems simple enough though I haven't found a way to get this > working. > > Neither of the completionTimeout examples I've found in source nor > CIA > > focus on the route timing after the split. > > > > > > (I'm actually trying to process a large file with streaming and > > parallelProcessing so completionFromBatchConsumer() wouldn't work, > though I > > think this part is irrelevant) > > > > > > Using Camel 2.17.3 > > > > > > Thanks > > > > > > > > > > > > American Express made the following annotations > > > ************************************************************ > > ****************** > > > "This message and any attachments are solely for the intended > recipient > > and may contain confidential or privileged information. If you are > not the > > intended recipient, any disclosure, copying, use, or distribution of > the > > information included in this message and any attachments is > prohibited. If > > you have received this communication in error, please notify us by > reply > > e-mail and immediately and permanently delete this message and any > > attachments. Thank you." > > > > > > American Express a ajouté le commentaire suivant le Ce courrier et > toute > > pièce jointe qu'il contient sont réservés au seul destinataire > indiqué et > > peuvent renfermer des > > > renseignements confidentiels et privilégiés. Si vous n'êtes pas le > > destinataire prévu, toute divulgation, duplication, utilisation ou > > distribution du courrier ou de toute pièce jointe est interdite. Si > vous > > avez reçu cette communication par erreur, veuillez nous en aviser par > > courrier et détruire immédiatement le courrier et les pièces > jointes. Merci. > > > > > > ************************************************************ > > ****************** > > > > > > > > -- > > Claus Ibsen > > ----------------- > > http://davsclaus.com @davsclaus > > Camel in Action 2: https://www.manning.com/ibsen2 > > > > > > > American Express made the following annotations > ************************************************************ > ****************** > "This message and any attachments are solely for the intended recipient > and may contain confidential or privileged information. If you are not the > intended recipient, any disclosure, copying, use, or distribution of the > information included in this message and any attachments is prohibited. If > you have received this communication in error, please notify us by reply > e-mail and immediately and permanently delete this message and any > attachments. Thank you." > > American Express a ajouté le commentaire suivant le Ce courrier et toute > pièce jointe qu'il contient sont réservés au seul destinataire indiqué et > peuvent renfermer des > renseignements confidentiels et privilégiés. Si vous n'êtes pas le > destinataire prévu, toute divulgation, duplication, utilisation ou > distribution du courrier ou de toute pièce jointe est interdite. Si vous > avez reçu cette communication par erreur, veuillez nous en aviser par > courrier et détruire immédiatement le courrier et les pièces jointes. Merci. > > ************************************************************ > ****************** >