It appears that the splitter is exiting while your aggregator is still
stuck waiting for the time out if I'm groking the code right.

On Nov 4, 2016 3:49 PM, "Craig Washington"
<craig.washing...@aexp.com.invalid> wrote:

> Thanks Brad.
> Just now taking a look at the resequencer. In my case the final message
> and ordering
> aren’t important, only the timing of the split/group/completion.
>
> The resequncer may work, likewise I suppose I could get away with using
> delay() and
> some sufficiently large value after the split, there just seems to be a
> better way that
> I’m missing.
>
> On 11/4/16, 8:54 AM, "Brad Johnson" <brad.john...@mediadriver.com> wrote:
>
>     @Craig,
>
>     This may or may not be appropriate to your situation depending on your
>     situation, but I recently ran into a situation where I was parallel
>     processing and making external REST calls to another company so one
>     couldn't count on uniform transaction speed and an exception absolutely
>     bogged a thread down. One example of the speed differential was the
> footer
>     that came in the input file had to be written out last and it required
> no
>     processing so it zipped through.
>
>     What I ended up doing is right after my splitter I added a header with
> an
>     incremented value so I'd know what order they came in. Then I'd drop it
>     into a SEDA queue with multiple threads for consumers. Right before I
> would
>     write it to the output file, I put in a resequencer with a time out.
> That
>     time out value was of sufficient length to ensure that values on the
>     resequence queue would get shuffled to correct order  before getting
> output.
>
>     The only item I wished were different about the resequencer is a reset
> on
>     the timeout of an item if another item came in with a lower number.
> If the
>     footer was number 20 and 4 seconds later number 18 came in, I'd prefer
>     number 20's timeout to be reset to help ensure that differences in
> order be
>     naturally taken care of.
>
>     Anyway, your aggregation strategy could assign an order number and
> before
>     writing it out you drop it on the resequencer queue so if another one
> with
>     a lower number comes in after it they get output in their natural
> order.
>     Someday if I get some time I'd like to modify the resequencer to have
> that
>     behavior, if not by default, then at least settable via a flag.  I
> can't
>     really think of a case where I wouldn't want the timer reset when a
>     resequence on order of messages occurred.  After all, the purpose is to
>     ensure the ordering is correct independent of the timing.
>
>     Like I said, that may or may not help in your situation. I don't know
> if
>     I've used the composed message EIP that Claus mentions so can't really
>     comment.
>
>     On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <claus.ib...@gmail.com>
> wrote:
>
>     > See the composed message processor EIP with the splitter only
>     > http://camel.apache.org/composed-message-processor.html
>     >
>     >
>     > On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
>     > <craig.washing...@aexp.com.invalid> wrote:
>     > > Hello,
>     > > I have a simple use case where I'd like to do the following:
>     > > * split a message and process each part
>     > > * aggregate parts into groups of size N for group-processing
> (w/timeout
>     > to ensure no parts are lost)
>     > > * continue route ONLY after all aggregated parts have completed
>     > >
>     > > The simplified route and output are as follows:
>     > > ---
>     > >
>     > > public class CamelSplitAggregateWaitForCompletion extends
>     > CamelTestSupport {
>     > >     @Test
>     > >     public void test() throws Exception {
>     > >         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
>     > >         Thread.sleep(3000);
>     > >     }
>     > >
>     > >     @Override
>     > >     protected RoutesBuilder createRouteBuilder() throws Exception {
>     > >         return new RouteBuilder() {
>     > >             @Override
>     > >             public void configure() throws Exception {
>     > >                 from("direct:start")
>     > >                     .split(body().tokenize(","), new
>     > UseLatestAggregationStrategy())
>     > >                     .streaming()
>     > >                         //process each msg part
>     > >                         .log("processing the msg: ${body}")
>     > >                         //group by size
>     > >                         .aggregate(constant(true), new
>     > GroupedMessageAggregationStrategy())
>     > >                         .completionSize(2).completionTimeout(2000)
>     > >                             //save grouped messages in batches
>     > >                             .log("saving msg group ${body}")
>     > >                             .to("mock:result")
>     > >                         //end the aggregate processing
>     > >                         .end()
>     > >                     //end the split processing
>     > >                     .end()
>     > >                     .log("*** COMPLETED split+aggregate
> processing");
>     > >
>     > >                     //...do some more stuff here ONLY after all
> parts
>     > are complete...
>     > >             }
>     > >         };
>     > >     }
>     > > }
>     > >
>     > > ---
>     > > //output
>     > > 15:28:52.072 INFO  route1 - processing the msg: AAA
>     > > 15:28:52.074 INFO  route1 - processing the msg: BBB
>     > > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2
> elements)
>     > > 15:28:52.076 INFO  route1 - processing the msg: CCC
>     > > 15:28:52.077 INFO  route1 - processing the msg: DDD
>     > > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2
> elements)
>     > > 15:28:52.078 INFO  route1 - processing the msg: EEE
>     > > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate
> processing
>     > > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1
> elements)
>     > > ---
>     > >
>     > > Ideally, the "COMPLETED" line should print last (after the final
>     > aggregated group from timeout).
>     > > Seems simple enough though I haven't found a way to get this
> working.
>     > Neither of the completionTimeout examples I've found in source nor
> CIA
>     > focus on the route timing after the split.
>     > >
>     > > (I'm actually trying to process a large file with streaming and
>     > parallelProcessing so completionFromBatchConsumer() wouldn't work,
> though I
>     > think this part is irrelevant)
>     > >
>     > > Using Camel 2.17.3
>     > >
>     > > Thanks
>     > >
>     > >
>     > >
>     > > American Express made the following annotations
>     > > ************************************************************
>     > ******************
>     > > "This message and any attachments are solely for the intended
> recipient
>     > and may contain confidential or privileged information. If you are
> not the
>     > intended recipient, any disclosure, copying, use, or distribution of
> the
>     > information included in this message and any attachments is
> prohibited. If
>     > you have received this communication in error, please notify us by
> reply
>     > e-mail and immediately and permanently delete this message and any
>     > attachments. Thank you."
>     > >
>     > > American Express a ajouté le commentaire suivant le Ce courrier et
> toute
>     > pièce jointe qu'il contient sont réservés au seul destinataire
> indiqué et
>     > peuvent renfermer des
>     > > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
>     > destinataire prévu, toute divulgation, duplication, utilisation ou
>     > distribution du courrier ou de toute pièce jointe est interdite. Si
> vous
>     > avez reçu cette communication par erreur, veuillez nous en aviser par
>     > courrier et détruire immédiatement le courrier et les pièces
> jointes. Merci.
>     > >
>     > > ************************************************************
>     > ******************
>     >
>     >
>     >
>     > --
>     > Claus Ibsen
>     > -----------------
>     > http://davsclaus.com @davsclaus
>     > Camel in Action 2: https://www.manning.com/ibsen2
>     >
>
>
>
>
> American Express made the following annotations
> ************************************************************
> ******************
> "This message and any attachments are solely for the intended recipient
> and may contain confidential or privileged information. If you are not the
> intended recipient, any disclosure, copying, use, or distribution of the
> information included in this message and any attachments is prohibited. If
> you have received this communication in error, please notify us by reply
> e-mail and immediately and permanently delete this message and any
> attachments. Thank you."
>
> American Express a ajouté le commentaire suivant le Ce courrier et toute
> pièce jointe qu'il contient sont réservés au seul destinataire indiqué et
> peuvent renfermer des
> renseignements confidentiels et privilégiés. Si vous n'êtes pas le
> destinataire prévu, toute divulgation, duplication, utilisation ou
> distribution du courrier ou de toute pièce jointe est interdite. Si vous
> avez reçu cette communication par erreur, veuillez nous en aviser par
> courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
>
> ************************************************************
> ******************
>

Reply via email to