In my case, the originating request comes from CXF. How do I send the aggregated response back to CXF?
On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote: > The consumer that handles the aggregated/timed-out request or response. > > I have to resend a few times if it's the request, I simply feed it back into > "direct:socketRequestRoute" with the header for the number of retry attempts > incremented. > If it's the response I can forward to some process. > > Taariq > > On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote: > >> What's listening on the: >> >> to("direct:requestResponse") >> >> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> wrote: >>> Sure >>> >>> You can of course solve what I've described many ways, but I'll >>> explain using 3 routes as that's what I used. >>> >>> This first route is the main route I mentioned earlier, so you send >>> your socket messages here and it's multicast to both the aggregator >>> and to the socket. >>> >>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator", >>> "someOutboundSocketEndpoint"); >>> >>> >>> This next route will aggregate, both requests and responses are sent >>> here as you envisaged. >>> from("direct:requestResponseAggregator"). >>> .aggregate(header("someCorrellationId"), >>> requestResponseAggregator) >>> .completionSize(2) >>> .completionTimeout(5000) >>> .to("direct:requestResponse"); //Here you can send the >>> "aggregated" message, in my case it's only the response I forward >>> unless there's a timeout, then I forward the request of course. >>> >>> Finally the route that consumes the socket responses. >>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator"); >>> //this headerEnricher doesn't have to be a processor, you have many >>> options to add a header. >>> >>> If that's not clear feel free to ask. >>> >>> Taariq >>> >>> >>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman >>> <ja...@carmanconsulting.com> wrote: >>>> Care to share an example? I'm not picturing it. >>>> >>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote: >>>>> Hi James >>>>> >>>>> I did that too for what it's worth. >>>>> I send the message to a route that forwards to both the aggregator and to >>>>> the socket. >>>>> When the response comes in I use an enricher to add the ID to the headers >>>>> and then forward to the aggregator. >>>>> >>>>> Taariq >>>>> >>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> >>>>> wrote: >>>>> >>>>>> Willem, >>>>>> >>>>>> Thank you for your help. I don't think this is doing exactly what I >>>>>> need, though. The real trick here is the asynchronous nature of the >>>>>> "server" on the other end of this situation. I thought about using an >>>>>> aggregator to make sure the response gets matched up with the request >>>>>> using a correlation id. The aggregator wouldn't aggregate multiple >>>>>> responses together into one, it would just make sure it matches the >>>>>> correct response with its request. Does this sound like a valid >>>>>> approach? If so, how the heck do I go about it? :) >>>>>> >>>>>> Thanks, >>>>>> >>>>>> James >>>>>> >>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> >>>>>> wrote: >>>>>>> Hi James, >>>>>>> >>>>>>> Camel async process engine already provides the way that you want. >>>>>>> You can take a look at the camel-cxf code[1][2] for some example. >>>>>>> >>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup >>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup >>>>>>> >>>>>>> On 8/7/11 1:29 AM, James Carman wrote: >>>>>>>> >>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Hi James, >>>>>>>>> >>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. >>>>>>>>> I >>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if >>>>>>>>> you >>>>>>>>> have your own code that does that, you can use it too, but you'd have >>>>>>>>> to >>>>>>>>> write your own Processor or Component). Iiuic, your scenario is >>>>>>>>> converting a >>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your >>>>>>>>> exchange as >>>>>>>>> an async in-out and let your framework (Camel) decompose it and >>>>>>>>> compose it >>>>>>>>> back again. I would not keep threads blocked so I believe your best >>>>>>>>> bet is >>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples >>>>>>>>> using >>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless >>>>>>>>> so >>>>>>>>> you'll need a correlationId, which you must have already and >>>>>>>>> something to >>>>>>>>> keep your state. A good bet would be jms [3], or you could write your >>>>>>>>> own. >>>>>>>>> If you used jms you would need to use both a correlationId and a >>>>>>>>> replyTo >>>>>>>>> queue. >>>>>>>>> >>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId"); >>>>>>>>> from("netty:input).to("jms:replyTo-queue") >>>>>>>>> >>>>>>>> >>>>>>>> Perhaps a bit more information might be appropriate here. Eventually, >>>>>>>> I'd like to "expose" this route via web services (using CXF of >>>>>>>> course). So, I would need to either block the request thread, waiting >>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous >>>>>>>> processing stuff (I'm thinking this might help us get more done with >>>>>>>> less http request threads) to do more of a continuation thing. >>>>>>>> >>>>>>>> We already have a correlation id. The "protocol" requires one and the >>>>>>>> server process just echos it back in the response message. >>>>>>>> >>>>>>>>> You may have to play a bit with the correlationId and if you cannot >>>>>>>>> use >>>>>>>>> the same you can do a second transformation/correlation using a >>>>>>>>> claim-check >>>>>>>>> sort of pattern. If you don't want to use jms you can implement your >>>>>>>>> own (in >>>>>>>>> memory) persistence and correlation. You can also use a resequencer >>>>>>>>> [4] if >>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the >>>>>>>>> replies >>>>>>>>> when they become available, and you can control that. >>>>>>>>> >>>>>>>> >>>>>>>> I don't think a resequencer is necessary. I don't want to guarantee >>>>>>>> the ordering. I'm mostly interested in throughput here. So, if a >>>>>>>> message comes in after another, but it can be processed faster, so be >>>>>>>> it. >>>>>>>> >>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, >>>>>>>>> but I >>>>>>>>> hope this helps. >>>>>>>>> Hadrian >>>>>>>>> >>>>>>>> >>>>>>>> You have been very helpful. Thank you for taking the time! >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Willem >>>>>>> ---------------------------------- >>>>>>> FuseSource >>>>>>> Web: http://www.fusesource.com >>>>>>> Blog: http://willemjiang.blogspot.com (English) >>>>>>> http://jnn.javaeye.com (Chinese) >>>>>>> Twitter: willemjiang >>>>>>> Weibo: willemjiang >>>>>>> >>>>> >>>> >>> >