@Craig,

This may or may not be appropriate to your situation depending on your
situation, but I recently ran into a situation where I was parallel
processing and making external REST calls to another company so one
couldn't count on uniform transaction speed and an exception absolutely
bogged a thread down. One example of the speed differential was the footer
that came in the input file had to be written out last and it required no
processing so it zipped through.

What I ended up doing is right after my splitter I added a header with an
incremented value so I'd know what order they came in. Then I'd drop it
into a SEDA queue with multiple threads for consumers. Right before I would
write it to the output file, I put in a resequencer with a time out.  That
time out value was of sufficient length to ensure that values on the
resequence queue would get shuffled to correct order  before getting output.

The only item I wished were different about the resequencer is a reset on
the timeout of an item if another item came in with a lower number.  If the
footer was number 20 and 4 seconds later number 18 came in, I'd prefer
number 20's timeout to be reset to help ensure that differences in order be
naturally taken care of.

Anyway, your aggregation strategy could assign an order number and before
writing it out you drop it on the resequencer queue so if another one with
a lower number comes in after it they get output in their natural order.
Someday if I get some time I'd like to modify the resequencer to have that
behavior, if not by default, then at least settable via a flag.  I can't
really think of a case where I wouldn't want the timer reset when a
resequence on order of messages occurred.  After all, the purpose is to
ensure the ordering is correct independent of the timing.

Like I said, that may or may not help in your situation. I don't know if
I've used the composed message EIP that Claus mentions so can't really
comment.

On Fri, Nov 4, 2016 at 1:37 AM, Claus Ibsen <claus.ib...@gmail.com> wrote:

> See the composed message processor EIP with the splitter only
> http://camel.apache.org/composed-message-processor.html
>
>
> On Thu, Nov 3, 2016 at 11:32 PM, Craig Washington
> <craig.washing...@aexp.com.invalid> wrote:
> > Hello,
> > I have a simple use case where I'd like to do the following:
> > * split a message and process each part
> > * aggregate parts into groups of size N for group-processing (w/timeout
> to ensure no parts are lost)
> > * continue route ONLY after all aggregated parts have completed
> >
> > The simplified route and output are as follows:
> > ---
> >
> > public class CamelSplitAggregateWaitForCompletion extends
> CamelTestSupport {
> >     @Test
> >     public void test() throws Exception {
> >         template.sendBody("direct:start", "AAA,BBB,CCC,DDD,EEE");
> >         Thread.sleep(3000);
> >     }
> >
> >     @Override
> >     protected RoutesBuilder createRouteBuilder() throws Exception {
> >         return new RouteBuilder() {
> >             @Override
> >             public void configure() throws Exception {
> >                 from("direct:start")
> >                     .split(body().tokenize(","), new
> UseLatestAggregationStrategy())
> >                     .streaming()
> >                         //process each msg part
> >                         .log("processing the msg: ${body}")
> >                         //group by size
> >                         .aggregate(constant(true), new
> GroupedMessageAggregationStrategy())
> >                         .completionSize(2).completionTimeout(2000)
> >                             //save grouped messages in batches
> >                             .log("saving msg group ${body}")
> >                             .to("mock:result")
> >                         //end the aggregate processing
> >                         .end()
> >                     //end the split processing
> >                     .end()
> >                     .log("*** COMPLETED split+aggregate processing");
> >
> >                     //...do some more stuff here ONLY after all parts
> are complete...
> >             }
> >         };
> >     }
> > }
> >
> > ---
> > //output
> > 15:28:52.072 INFO  route1 - processing the msg: AAA
> > 15:28:52.074 INFO  route1 - processing the msg: BBB
> > 15:28:52.075 INFO  route1 - saving msg group List<Exchange>(2 elements)
> > 15:28:52.076 INFO  route1 - processing the msg: CCC
> > 15:28:52.077 INFO  route1 - processing the msg: DDD
> > 15:28:52.077 INFO  route1 - saving msg group List<Exchange>(2 elements)
> > 15:28:52.078 INFO  route1 - processing the msg: EEE
> > 15:28:52.079 INFO  route1 - *** COMPLETED split+aggregate processing
> > 15:28:55.064 INFO  route1 - saving msg group List<Exchange>(1 elements)
> > ---
> >
> > Ideally, the "COMPLETED" line should print last (after the final
> aggregated group from timeout).
> > Seems simple enough though I haven't found a way to get this working.
> Neither of the completionTimeout examples I've found in source nor CIA
> focus on the route timing after the split.
> >
> > (I'm actually trying to process a large file with streaming and
> parallelProcessing so completionFromBatchConsumer() wouldn't work, though I
> think this part is irrelevant)
> >
> > Using Camel 2.17.3
> >
> > Thanks
> >
> >
> >
> > American Express made the following annotations
> > ************************************************************
> ******************
> > "This message and any attachments are solely for the intended recipient
> and may contain confidential or privileged information. If you are not the
> intended recipient, any disclosure, copying, use, or distribution of the
> information included in this message and any attachments is prohibited. If
> you have received this communication in error, please notify us by reply
> e-mail and immediately and permanently delete this message and any
> attachments. Thank you."
> >
> > American Express a ajouté le commentaire suivant le Ce courrier et toute
> pièce jointe qu'il contient sont réservés au seul destinataire indiqué et
> peuvent renfermer des
> > renseignements confidentiels et privilégiés. Si vous n'êtes pas le
> destinataire prévu, toute divulgation, duplication, utilisation ou
> distribution du courrier ou de toute pièce jointe est interdite. Si vous
> avez reçu cette communication par erreur, veuillez nous en aviser par
> courrier et détruire immédiatement le courrier et les pièces jointes. Merci.
> >
> > ************************************************************
> ******************
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>

Reply via email to