I guess one thing that come to my mind is to hide all this parallel stuff inside a processor, that would just spit out on the other end the result of processing all those messages. It would handle grouping and serializing and stuff I guess that would reduce the complexity of the route with a cost of complexity of the processor. I have no better ideas anyway, so I will give it a go
On Tue, Apr 11, 2017 at 10:45 AM, Artur Jablonski <ajablon...@ravenpack.com> wrote: > Hello, > > I don't think this route definition is fitting my use case, though I > learnt a thing or two about the interesting patterns linked. Thanks! > > Ok, so let me try to clarify the use case. > > > 1. The stream is infinite, it's not a batch job. The messages keep on > coming from SQS 'all the time' > > 2. More important thing is about parallel processing. > > Let A1 denote a message 1 from group A, B2 message 2 from group B, etc. > > Let's say this is the order in which the messages happen to appear > in the route from SQS > > A1, A2, B1, C1, B2, A3, C2, B3 > > Now what I am trying to achieve is grouping the messages that have to be > processed sequentially (order doesn't matter as long as no two messages > from the same group are processed at the same time). > So I am trying to somehow get these streams > > A1, A2, A3 > > B1, B2, B3 > > C1, C2 > > > So, A1 B1 and C1 can be processed in parallel because they are from > different groups, but the messages within groups need to be processed one > by one. > > In my example, there are 3 groups, but there can be many and I don't know > what they are in advance. The processing logic between the groups is > similar and is a function of the group so I can get a processor for group A > from a method call getProcessor(A), B getProcessor(B), etc. > > I am stuck at how to do that in Camel, because since I don't know the > groups in advance, I would need to create processing routes dynamically. > > Say the system starts, and A1 arrives, there can't be any processor for > group A yet, since it's the first message from the group and I need to > somehow dynamically add processing capability of the group A to the route > and then perhaps if the messages from group A stop arriving for some time, > that processor could be removed. > > How to add the parallel part between the group messages is also blurry to > me. One way of doing this I was thinking was to do a multicast to all the > dynamically created processing routes for groups and stick a filter before > so that only messages from particular group can go through. From multicast > page: > > from("direct:a").multicast().parallelProcessing().to("direct:x", > "direct:y", "direct:z"); > > But here the x,y,z endpoints are hardcoded. I could write up some custom > multicast I suppose to search the routes in CamelContext...... not sure. > > Thanks > Artur > > > > > > On Mon, Apr 3, 2017 at 1:36 PM, Artur Jablonski <ajablon...@ravenpack.com> > wrote: > >> Hi Zoran, >> >> Thank you for such detailed response. This looks very promising. i will >> need to get my head around the aggregator pattern. >> For this week I will be busy with other tasks, but I will get back to it >> as soon as I can to see if I can get Camel work for the use case. >> >> Cheerio >> Artur >> >> On Mon, Apr 3, 2017 at 11:09 AM, Zoran Regvart <zo...@regvart.com> wrote: >> >>> Hi Artur, >>> I was thinking that the order of the messages would be important as >>> you need to process them sequentially. >>> >>> So I think you could use the dynamic message routing[1] with >>> aggregator[2], something like: >>> >>> from("aws-sqs:...") >>> .process("#preProcess") >>> .toD("direct:${header.nextRoute}"); >>> >>> from("direct:parallel")...; >>> from("direct:sequential").aggregate(simple("${header.group}" >>> )).completion..; >>> >>> So from yout SQS queue you would use a processor to pre-process >>> message whose responsibility would be to set the (custom) `nextRoute` >>> and (custom) `group` headers. `nextRoute` would be `parallel` or >>> `sequential`, and if `sequential` the messages would be aggregated >>> using the `group` header. >>> >>> You would want to define your own custom aggregation strategy or use >>> the completion* options that are available to you. There also might be >>> need to use seda[3] to fine tune any parallel processing. You might >>> throw in there a data format unmarshaller[4] instead of the >>> `preProcess` processor and use something like `${body.xyz} == foo` in >>> the `toD` expression. >>> >>> And I would guess that you need to examine transactions or persistence >>> at some point also in case your aggregation step runs for a long time >>> or if your use case is sensitive to message loss if interrupted -- >>> which would undoubtedly lead you back to using queues to separate >>> those two ways of processing, >>> >>> HTH, >>> >>> zoran >>> >>> [1] https://camel.apache.org/message-endpoint.html >>> [2] https://camel.apache.org/aggregator2.html >>> [3] https://camel.apache.org/seda.html >>> [4] https://camel.apache.org/data-format.html >>> >>> On Sat, Apr 1, 2017 at 1:09 PM, Artur Jablonski >>> <ajablon...@ravenpack.com> wrote: >>> > Hey Zoran. >>> > >>> > I read again the patterns you mentioned. In my use case the order of >>> > processing within a group doesn't matter as long as two messages from >>> the >>> > same group are never processed in parallel. So i guess resenquencer is >>> out >>> > of the picture unless I didn't get the intention. >>> > >>> > So what we are left with is the content based router. Sure. The message >>> > comes, i can see what group it belongs two... And what next? Perhaps >>> it's >>> > the very first message from that group so I would need to trigger >>> creating >>> > route/processor for that group somehow, perhaps messages from this >>> group >>> > were processed before in which case the processor for the group should >>> > already exist... >>> > >>> > >>> > >>> > >>> > On 31 Mar 2017 7:58 p.m., "Zoran Regvart" <zo...@regvart.com> wrote: >>> > >>> >> Hi Artur, >>> >> have a look at Camel EIP page[1], what you describe sounds to me like >>> >> Resequencer and Content based router patterns, >>> >> >>> >> zoran >>> >> >>> >> [1] https://camel.apache.org/eip.html >>> >> >>> >> On Fri, Mar 31, 2017 at 5:08 PM, Artur Jablonski >>> >> <ajablon...@ravenpack.com> wrote: >>> >> > Hello. >>> >> > >>> >> > I wonder if someone could push me in the right direction trying to >>> >> express >>> >> > quite curious case in Camel route. >>> >> > >>> >> > Imagine there's a stream of messages some of which can be processed >>> in >>> >> > parallel and some have to be processed serially. You can group the >>> >> messages >>> >> > that require serial processing together by looking at the message >>> body. >>> >> You >>> >> > don't know upfront how many groups can occur in the stream. >>> >> > >>> >> > The way I thought about doing this is having a route for each >>> message >>> >> > group. Since I don't know upfront how many and what groups there >>> will be >>> >> > then I would need to create routes dynamically. If a message comes >>> >> > belonging to a group that doesn't have it's handling route, then i >>> could >>> >> > create it (is that even possible??) Then if there's no messages >>> coming >>> >> for >>> >> > a given group in some time I could remove the route for the group to >>> >> > cleanup (is that possible?) >>> >> > >>> >> > New to Camel >>> >> > >>> >> > Thx! >>> >> > Artur >>> >> >>> >> >>> >> >>> >> -- >>> >> Zoran Regvart >>> >> >>> >>> >>> >>> -- >>> Zoran Regvart >>> >> >> >