On Sat, Aug 27, 2011 at 10:19 AM, James Carman <ja...@carmanconsulting.com> wrote: > Well, I can tell you that it certainly didn't seem to work the way I > need it to work. I need a persistent connection (with automatic > reconnects). I also need it to be in/out, but asynchronous (the > current incoming message may or may not correspond to the most > recently sent message). For now, I've resorted to just rolling my own > solution by directly coding to the Netty API. I will re-visit with > Camel later I'm sure. Thanks for your help. >
Well you could also consider contribution improvements to the Camel components. The Camel community love contributions http://camel.apache.org/contributing.html > On Thu, Aug 25, 2011 at 1:16 AM, Taariq Levack <taar...@gmail.com> wrote: >> I expect that the connection will only be closed if the header >> NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE is true. >> >> Glancing at the code I see what you mean, it's quite unlike MINA's producer >> which checks the session to see if it's connected and reuses it, but it may >> be that under the hood, yet further under the hood, hehe, way further down >> into netty's ClientBootstrap and beyond, the connection is being reused. I >> don't know for sure. >> >> This is from Netty front-page, "True connectionless datagram socket support >> (since 3.1)": >> And glancing at that bit elsewhere I think it's possible to do without this >> sort of plumbing, but you'd have to jump into netty code or docs to confirm. >> >> Depending on timing and other factors I would go ahead with a POC because it >> either works or it will work, a failing test from your POC will be most >> welcome. >> >> Taariq >> >> >> On Wed, Aug 24, 2011 at 12:43 PM, James Carman >> <ja...@carmanconsulting.com>wrote: >> >>> I have read the source: >>> >>> >>> http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java >>> >>> Take a look at the process() method. In there, there is a block of >>> code that does: >>> >>> ChannelFuture channelFuture; >>> final Channel channel; >>> try { >>> channelFuture = openConnection(exchange, callback); >>> channel = openChannel(channelFuture); >>> } catch (Exception e) { >>> exchange.setException(e); >>> callback.done(true); >>> return true; >>> } >>> >>> >>> This is not inside an if block or anything and the openConnection() >>> method does actually open it, it isn't just returning a >>> previously-opened connection or anything. >>> >>> Perhaps I'm missing something (entirely possible), but it appears that >>> it's opening the connection every time the process() method is called. >>> >>> On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <taar...@gmail.com> wrote: >>> > That doesn't sound right, what have you read? Logs/docs? >>> > And are you using keep-alive? >>> > >>> > Taariq >>> > >>> > >>> > On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com> >>> wrote: >>> > >>> >> Well, it looks like the camel-netty component won't work for me. It >>> >> appears that it opens the connection for each exchange. Am I reading >>> >> that right? What I need is a persistent connection with automatic >>> >> reconnects. Oh well, back to the drawing board. >>> >> >>> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman >>> >> <ja...@carmanconsulting.com> wrote: >>> >>> That's what I've been staring at! :) Here's what I'm thinking I'm >>> >>> going to need to write. I need an async processor that remembers the >>> >>> AsyncCallback and associates it with a correlation id. Then, when >>> >>> another exchange comes in that has the same correlation id, it will >>> >>> lookup the previous callback and say that it's done. I have a lot of >>> >>> questions, though. I've never had to get so "down and dirty" with >>> >>> Camel before. The components have just worked for me "off the shelf." >>> >>> >>> >>> 1. Do I just copy the input message of the Exchange that comes in >>> >>> second to the output message of the originating exchange? >>> >>> 2. How do I do a timeout for the original caller (the CXF request)? >>> >>> 3. How do I detect that the caller has timed out if they do? >>> >>> >>> >>> I'm sure I'll have more questions, but these are the ones off the top >>> >>> of my head. >>> >>> >>> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <taar...@gmail.com> >>> wrote: >>> >>>> James I think the rest of your puzzle is solved by Camel's async API, >>> >>>> you might have to check if your task is done, maybe your >>> >>>> requestResponse populates some collection of responses and provides >>> >>>> some API to return the response given a correlationID. >>> >>>> Stare at the async docs [1] a few more times and I'm sure you'll find >>> >>>> your answer. >>> >>>> >>> >>>> [1] http://camel.apache.org/async.html >>> >>>> >>> >>>> Taariq >>> >>>> >>> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman >>> >>>> <ja...@carmanconsulting.com> wrote: >>> >>>>> No worries! Thank you for your help. It helped me understand a bit >>> >>>>> more about how these aggregators work.. However, I still don't >>> >>>>> understand how to take care of my problem. I guess I'm going to have >>> >>>>> to roll my own processor or something. >>> >>>>> >>> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com> >>> wrote: >>> >>>>>> Hmmm. >>> >>>>>> Maybe others can help with that if it's possible, I haven't had to >>> wrestle with it. >>> >>>>>> >>> >>>>>> In my case it is actually a cxf service too, but it's asynchronous >>> and I send the response once I have it, indicating either timeout or the >>> actual response. >>> >>>>>> >>> >>>>>> Sorry I responded to your question without going back to see your >>> other posts. >>> >>>>>> >>> >>>>>> Taariq >>> >>>>>> >>> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman < >>> ja...@carmanconsulting.com> wrote: >>> >>>>>> >>> >>>>>>> In my case, the originating request comes from CXF. How do I send >>> the >>> >>>>>>> aggregated response back to CXF? >>> >>>>>>> >>> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> >>> wrote: >>> >>>>>>>> The consumer that handles the aggregated/timed-out request or >>> response. >>> >>>>>>>> >>> >>>>>>>> I have to resend a few times if it's the request, I simply feed it >>> back into "direct:socketRequestRoute" with the header for the number of >>> retry attempts incremented. >>> >>>>>>>> If it's the response I can forward to some process. >>> >>>>>>>> >>> >>>>>>>> Taariq >>> >>>>>>>> >>> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman < >>> ja...@carmanconsulting.com> wrote: >>> >>>>>>>> >>> >>>>>>>>> What's listening on the: >>> >>>>>>>>> >>> >>>>>>>>> to("direct:requestResponse") >>> >>>>>>>>> >>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack < >>> taar...@gmail.com> wrote: >>> >>>>>>>>>> Sure >>> >>>>>>>>>> >>> >>>>>>>>>> You can of course solve what I've described many ways, but I'll >>> >>>>>>>>>> explain using 3 routes as that's what I used. >>> >>>>>>>>>> >>> >>>>>>>>>> This first route is the main route I mentioned earlier, so you >>> send >>> >>>>>>>>>> your socket messages here and it's multicast to both the >>> aggregator >>> >>>>>>>>>> and to the socket. >>> >>>>>>>>>> >>> >>>>>>>>>> >>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator", >>> >>>>>>>>>> "someOutboundSocketEndpoint"); >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> This next route will aggregate, both requests and responses are >>> sent >>> >>>>>>>>>> here as you envisaged. >>> >>>>>>>>>> from("direct:requestResponseAggregator"). >>> >>>>>>>>>> .aggregate(header("someCorrellationId"), >>> >>>>>>>>>> requestResponseAggregator) >>> >>>>>>>>>> .completionSize(2) >>> >>>>>>>>>> .completionTimeout(5000) >>> >>>>>>>>>> .to("direct:requestResponse"); //Here you can >>> send the >>> >>>>>>>>>> "aggregated" message, in my case it's only the response I >>> forward >>> >>>>>>>>>> unless there's a timeout, then I forward the request of course. >>> >>>>>>>>>> >>> >>>>>>>>>> Finally the route that consumes the socket responses. >>> >>>>>>>>>> >>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator"); >>> >>>>>>>>>> //this headerEnricher doesn't have to be a processor, you have >>> many >>> >>>>>>>>>> options to add a header. >>> >>>>>>>>>> >>> >>>>>>>>>> If that's not clear feel free to ask. >>> >>>>>>>>>> >>> >>>>>>>>>> Taariq >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman >>> >>>>>>>>>> <ja...@carmanconsulting.com> wrote: >>> >>>>>>>>>>> Care to share an example? I'm not picturing it. >>> >>>>>>>>>>> >>> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack < >>> taar...@gmail.com> wrote: >>> >>>>>>>>>>>> Hi James >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> I did that too for what it's worth. >>> >>>>>>>>>>>> I send the message to a route that forwards to both the >>> aggregator and to the socket. >>> >>>>>>>>>>>> When the response comes in I use an enricher to add the ID to >>> the headers and then forward to the aggregator. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Taariq >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman < >>> ja...@carmanconsulting.com> wrote: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>>> Willem, >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> Thank you for your help. I don't think this is doing exactly >>> what I >>> >>>>>>>>>>>>> need, though. The real trick here is the asynchronous nature >>> of the >>> >>>>>>>>>>>>> "server" on the other end of this situation. I thought about >>> using an >>> >>>>>>>>>>>>> aggregator to make sure the response gets matched up with the >>> request >>> >>>>>>>>>>>>> using a correlation id. The aggregator wouldn't aggregate >>> multiple >>> >>>>>>>>>>>>> responses together into one, it would just make sure it >>> matches the >>> >>>>>>>>>>>>> correct response with its request. Does this sound like a >>> valid >>> >>>>>>>>>>>>> approach? If so, how the heck do I go about it? :) >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> Thanks, >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> James >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang < >>> willem.ji...@gmail.com> wrote: >>> >>>>>>>>>>>>>> Hi James, >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Camel async process engine already provides the way that you >>> want. >>> >>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some >>> example. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> [1] >>> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup >>> >>>>>>>>>>>>>> [2] >>> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote: >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian< >>> hzbar...@gmail.com> >>> >>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Hi James, >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a >>> few thoughts. I >>> >>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to >>> your sever (if you >>> >>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but >>> you'd have to >>> >>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your >>> scenario is converting a >>> >>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat >>> your exchange as >>> >>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose >>> it and compose it >>> >>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe >>> your best bet is >>> >>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at >>> the examples using >>> >>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is >>> stateless so >>> >>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already >>> and something to >>> >>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could >>> write your own. >>> >>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId >>> and a replyTo >>> >>>>>>>>>>>>>>>> queue. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> >>> from("jms:request-queue").to("netty:output?=correlationId"); >>> >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue") >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here. >>> Eventually, >>> >>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF >>> of >>> >>>>>>>>>>>>>>> course). So, I would need to either block the request >>> thread, waiting >>> >>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 >>> asynchronous >>> >>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more >>> done with >>> >>>>>>>>>>>>>>> less http request threads) to do more of a continuation >>> thing. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> We already have a correlation id. The "protocol" requires >>> one and the >>> >>>>>>>>>>>>>>> server process just echos it back in the response message. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if >>> you cannot use >>> >>>>>>>>>>>>>>>> the same you can do a second transformation/correlation >>> using a claim-check >>> >>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can >>> implement your own (in >>> >>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a >>> resequencer [4] if >>> >>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback, >>> you get the replies >>> >>>>>>>>>>>>>>>> when they become available, and you can control that. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> I don't think a resequencer is necessary. I don't want to >>> guarantee >>> >>>>>>>>>>>>>>> the ordering. I'm mostly interested in throughput here. >>> So, if a >>> >>>>>>>>>>>>>>> message comes in after another, but it can be processed >>> faster, so be >>> >>>>>>>>>>>>>>> it. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more >>> thought, but I >>> >>>>>>>>>>>>>>>> hope this helps. >>> >>>>>>>>>>>>>>>> Hadrian >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> You have been very helpful. Thank you for taking the time! >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> -- >>> >>>>>>>>>>>>>> Willem >>> >>>>>>>>>>>>>> ---------------------------------- >>> >>>>>>>>>>>>>> FuseSource >>> >>>>>>>>>>>>>> Web: http://www.fusesource.com >>> >>>>>>>>>>>>>> Blog: http://willemjiang.blogspot.com (English) >>> >>>>>>>>>>>>>> http://jnn.javaeye.com (Chinese) >>> >>>>>>>>>>>>>> Twitter: willemjiang >>> >>>>>>>>>>>>>> Weibo: willemjiang >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>> >>> >>>>>> >>> >>>>> >>> >>>> >>> >>> >>> > >>> >> > -- Claus Ibsen ----------------- FuseSource Email: cib...@fusesource.com Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/