What's listening on the: to("direct:requestResponse")
On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> wrote: > Sure > > You can of course solve what I've described many ways, but I'll > explain using 3 routes as that's what I used. > > This first route is the main route I mentioned earlier, so you send > your socket messages here and it's multicast to both the aggregator > and to the socket. > > from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator", > "someOutboundSocketEndpoint"); > > > This next route will aggregate, both requests and responses are sent > here as you envisaged. > from("direct:requestResponseAggregator"). > .aggregate(header("someCorrellationId"), > requestResponseAggregator) > .completionSize(2) > .completionTimeout(5000) > .to("direct:requestResponse"); //Here you can send the > "aggregated" message, in my case it's only the response I forward > unless there's a timeout, then I forward the request of course. > > Finally the route that consumes the socket responses. > from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator"); > //this headerEnricher doesn't have to be a processor, you have many > options to add a header. > > If that's not clear feel free to ask. > > Taariq > > > On Tue, Aug 16, 2011 at 9:30 PM, James Carman > <ja...@carmanconsulting.com> wrote: >> Care to share an example? I'm not picturing it. >> >> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote: >>> Hi James >>> >>> I did that too for what it's worth. >>> I send the message to a route that forwards to both the aggregator and to >>> the socket. >>> When the response comes in I use an enricher to add the ID to the headers >>> and then forward to the aggregator. >>> >>> Taariq >>> >>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> wrote: >>> >>>> Willem, >>>> >>>> Thank you for your help. I don't think this is doing exactly what I >>>> need, though. The real trick here is the asynchronous nature of the >>>> "server" on the other end of this situation. I thought about using an >>>> aggregator to make sure the response gets matched up with the request >>>> using a correlation id. The aggregator wouldn't aggregate multiple >>>> responses together into one, it would just make sure it matches the >>>> correct response with its request. Does this sound like a valid >>>> approach? If so, how the heck do I go about it? :) >>>> >>>> Thanks, >>>> >>>> James >>>> >>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> >>>> wrote: >>>>> Hi James, >>>>> >>>>> Camel async process engine already provides the way that you want. >>>>> You can take a look at the camel-cxf code[1][2] for some example. >>>>> >>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup >>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup >>>>> >>>>> On 8/7/11 1:29 AM, James Carman wrote: >>>>>> >>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>> Hi James, >>>>>>> >>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. I >>>>>>> assume want to use camel-netty [1] to send messages to your sever (if >>>>>>> you >>>>>>> have your own code that does that, you can use it too, but you'd have to >>>>>>> write your own Processor or Component). Iiuic, your scenario is >>>>>>> converting a >>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your >>>>>>> exchange as >>>>>>> an async in-out and let your framework (Camel) decompose it and compose >>>>>>> it >>>>>>> back again. I would not keep threads blocked so I believe your best bet >>>>>>> is >>>>>>> using the Camel async messaging [2] and Futures (look at the examples >>>>>>> using >>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless so >>>>>>> you'll need a correlationId, which you must have already and something >>>>>>> to >>>>>>> keep your state. A good bet would be jms [3], or you could write your >>>>>>> own. >>>>>>> If you used jms you would need to use both a correlationId and a replyTo >>>>>>> queue. >>>>>>> >>>>>>> from("jms:request-queue").to("netty:output?=correlationId"); >>>>>>> from("netty:input).to("jms:replyTo-queue") >>>>>>> >>>>>> >>>>>> Perhaps a bit more information might be appropriate here. Eventually, >>>>>> I'd like to "expose" this route via web services (using CXF of >>>>>> course). So, I would need to either block the request thread, waiting >>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous >>>>>> processing stuff (I'm thinking this might help us get more done with >>>>>> less http request threads) to do more of a continuation thing. >>>>>> >>>>>> We already have a correlation id. The "protocol" requires one and the >>>>>> server process just echos it back in the response message. >>>>>> >>>>>>> You may have to play a bit with the correlationId and if you cannot use >>>>>>> the same you can do a second transformation/correlation using a >>>>>>> claim-check >>>>>>> sort of pattern. If you don't want to use jms you can implement your >>>>>>> own (in >>>>>>> memory) persistence and correlation. You can also use a resequencer [4] >>>>>>> if >>>>>>> you want to enforce the order. If you use asyncCallback, you get the >>>>>>> replies >>>>>>> when they become available, and you can control that. >>>>>>> >>>>>> >>>>>> I don't think a resequencer is necessary. I don't want to guarantee >>>>>> the ordering. I'm mostly interested in throughput here. So, if a >>>>>> message comes in after another, but it can be processed faster, so be >>>>>> it. >>>>>> >>>>>>> It's an interesting scenario, I'll definitely give it more thought, but >>>>>>> I >>>>>>> hope this helps. >>>>>>> Hadrian >>>>>>> >>>>>> >>>>>> You have been very helpful. Thank you for taking the time! >>>>>> >>>>> >>>>> >>>>> -- >>>>> Willem >>>>> ---------------------------------- >>>>> FuseSource >>>>> Web: http://www.fusesource.com >>>>> Blog: http://willemjiang.blogspot.com (English) >>>>> http://jnn.javaeye.com (Chinese) >>>>> Twitter: willemjiang >>>>> Weibo: willemjiang >>>>> >>> >> >