On Sat, Aug 27, 2011 at 10:19 AM, James Carman
<ja...@carmanconsulting.com> wrote:
> Well, I can tell you that it certainly didn't seem to work the way I
> need it to work.  I need a persistent connection (with automatic
> reconnects).  I also need it to be in/out, but asynchronous (the
> current incoming message may or may not correspond to the most
> recently sent message).  For now, I've resorted to just rolling my own
> solution by directly coding to the Netty API.  I will re-visit with
> Camel later I'm sure.  Thanks for your help.
>

Well you could also consider contribution improvements to the Camel components.

The Camel community love contributions
http://camel.apache.org/contributing.html

> On Thu, Aug 25, 2011 at 1:16 AM, Taariq Levack <taar...@gmail.com> wrote:
>> I expect that the connection will only be closed if the header
>> NettyConstants.NETTY_CLOSE_CHANNEL_WHEN_COMPLETE is true.
>>
>> Glancing at the code I see what you mean, it's quite unlike MINA's producer
>> which checks the session to see if it's connected and reuses it, but it may
>> be that under the hood, yet further under the hood, hehe, way further down
>> into netty's ClientBootstrap and beyond, the connection is being reused. I
>> don't know for sure.
>>
>> This is from Netty front-page, "True connectionless datagram socket support
>> (since 3.1)":
>> And glancing at that bit elsewhere I think it's possible to do without this
>> sort of plumbing, but you'd have to jump into netty code or docs to confirm.
>>
>> Depending on timing and other factors I would go ahead with a POC because it
>> either works or it will work, a failing test from your POC will be most
>> welcome.
>>
>> Taariq
>>
>>
>> On Wed, Aug 24, 2011 at 12:43 PM, James Carman
>> <ja...@carmanconsulting.com>wrote:
>>
>>> I have read the source:
>>>
>>>
>>> http://svn.apache.org/repos/asf/camel/trunk/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
>>>
>>> Take a look at the process() method.  In there, there is a block of
>>> code that does:
>>>
>>>        ChannelFuture channelFuture;
>>>        final Channel channel;
>>>        try {
>>>            channelFuture = openConnection(exchange, callback);
>>>            channel = openChannel(channelFuture);
>>>        } catch (Exception e) {
>>>            exchange.setException(e);
>>>            callback.done(true);
>>>            return true;
>>>        }
>>>
>>>
>>> This is not inside an if block or anything and the openConnection()
>>> method does actually open it, it isn't just returning a
>>> previously-opened connection or anything.
>>>
>>> Perhaps I'm missing something (entirely possible), but it appears that
>>> it's opening the connection every time the process() method is called.
>>>
>>> On Tue, Aug 23, 2011 at 11:09 PM, Taariq Levack <taar...@gmail.com> wrote:
>>> > That doesn't sound right, what have you read? Logs/docs?
>>> > And are you using keep-alive?
>>> >
>>> > Taariq
>>> >
>>> >
>>> > On 24 Aug 2011, at 12:12 AM, James Carman <ja...@carmanconsulting.com>
>>> wrote:
>>> >
>>> >> Well, it looks like the camel-netty component won't work for me.  It
>>> >> appears that it opens the connection for each exchange.  Am I reading
>>> >> that right?  What I need is a persistent connection with automatic
>>> >> reconnects.  Oh well, back to the drawing board.
>>> >>
>>> >> On Wed, Aug 17, 2011 at 7:59 AM, James Carman
>>> >> <ja...@carmanconsulting.com> wrote:
>>> >>> That's what I've been staring at! :)  Here's what I'm thinking I'm
>>> >>> going to need to write.  I need an async processor that remembers the
>>> >>> AsyncCallback and associates it with a correlation id.  Then, when
>>> >>> another exchange comes in that has the same correlation id, it will
>>> >>> lookup the previous callback and say that it's done.  I have a lot of
>>> >>> questions, though.  I've never had to get so "down and dirty" with
>>> >>> Camel before.  The components have just worked for me "off the shelf."
>>> >>>
>>> >>> 1.  Do I just copy the input message of the Exchange that comes in
>>> >>> second to the output message of the originating exchange?
>>> >>> 2.  How do I do a timeout for the original caller (the CXF request)?
>>> >>> 3.  How do I detect that the caller has timed out if they do?
>>> >>>
>>> >>> I'm sure I'll have more questions, but these are the ones off the top
>>> >>> of my head.
>>> >>>
>>> >>> On Wed, Aug 17, 2011 at 1:48 AM, Taariq Levack <taar...@gmail.com>
>>> wrote:
>>> >>>> James I think the rest of your puzzle is solved by Camel's async API,
>>> >>>> you might have to check if your task is done, maybe your
>>> >>>> requestResponse populates some collection of responses and provides
>>> >>>> some API to return the response given a correlationID.
>>> >>>> Stare at the async docs [1] a few more times and I'm sure you'll find
>>> >>>> your answer.
>>> >>>>
>>> >>>> [1] http://camel.apache.org/async.html
>>> >>>>
>>> >>>> Taariq
>>> >>>>
>>> >>>> On Tue, Aug 16, 2011 at 11:16 PM, James Carman
>>> >>>> <ja...@carmanconsulting.com> wrote:
>>> >>>>> No worries!  Thank you for your help.  It helped me understand a bit
>>> >>>>> more about how these aggregators work..  However, I still don't
>>> >>>>> understand how to take care of my problem.  I guess I'm going to have
>>> >>>>> to roll my own processor or something.
>>> >>>>>
>>> >>>>> On Tue, Aug 16, 2011 at 4:50 PM, Taariq Levack <taar...@gmail.com>
>>> wrote:
>>> >>>>>> Hmmm.
>>> >>>>>> Maybe others can help with that if it's possible, I haven't had to
>>> wrestle with it.
>>> >>>>>>
>>> >>>>>> In my case it is actually a cxf service too, but it's asynchronous
>>>  and I send the response once I have it, indicating either timeout or the
>>> actual response.
>>> >>>>>>
>>> >>>>>> Sorry I responded to your question without going back to see your
>>> other posts.
>>> >>>>>>
>>> >>>>>> Taariq
>>> >>>>>>
>>> >>>>>> On 16 Aug 2011, at 10:33 PM, James Carman <
>>> ja...@carmanconsulting.com> wrote:
>>> >>>>>>
>>> >>>>>>> In my case, the originating request comes from CXF.  How do I send
>>> the
>>> >>>>>>> aggregated response back to CXF?
>>> >>>>>>>
>>> >>>>>>> On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com>
>>> wrote:
>>> >>>>>>>> The consumer that handles the aggregated/timed-out request or
>>> response.
>>> >>>>>>>>
>>> >>>>>>>> I have to resend a few times if it's the request, I simply feed it
>>> back into "direct:socketRequestRoute" with the header for the number of
>>> retry attempts incremented.
>>> >>>>>>>> If it's the response I can forward to some process.
>>> >>>>>>>>
>>> >>>>>>>> Taariq
>>> >>>>>>>>
>>> >>>>>>>> On 16 Aug 2011, at 10:18 PM, James Carman <
>>> ja...@carmanconsulting.com> wrote:
>>> >>>>>>>>
>>> >>>>>>>>> What's listening on the:
>>> >>>>>>>>>
>>> >>>>>>>>> to("direct:requestResponse")
>>> >>>>>>>>>
>>> >>>>>>>>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <
>>> taar...@gmail.com> wrote:
>>> >>>>>>>>>> Sure
>>> >>>>>>>>>>
>>> >>>>>>>>>> You can of course solve what I've described many ways, but I'll
>>> >>>>>>>>>> explain using 3 routes as that's what I used.
>>> >>>>>>>>>>
>>> >>>>>>>>>> This first route is the main route I mentioned earlier, so you
>>> send
>>> >>>>>>>>>> your socket messages here and it's multicast to both the
>>> aggregator
>>> >>>>>>>>>> and to the socket.
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>> >>>>>>>>>>  "someOutboundSocketEndpoint");
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> This next route will aggregate, both requests and responses are
>>> sent
>>> >>>>>>>>>> here as you envisaged.
>>> >>>>>>>>>> from("direct:requestResponseAggregator").
>>> >>>>>>>>>>                .aggregate(header("someCorrellationId"),
>>> >>>>>>>>>> requestResponseAggregator)
>>> >>>>>>>>>>                .completionSize(2)
>>> >>>>>>>>>>                .completionTimeout(5000)
>>> >>>>>>>>>>                .to("direct:requestResponse"); //Here you can
>>> send the
>>> >>>>>>>>>> "aggregated" message, in my case it's only the response I
>>> forward
>>> >>>>>>>>>> unless there's a timeout, then I forward the request of course.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Finally the route that consumes the socket responses.
>>> >>>>>>>>>>
>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>> >>>>>>>>>>   //this headerEnricher doesn't have to be a processor, you have
>>> many
>>> >>>>>>>>>> options to add a header.
>>> >>>>>>>>>>
>>> >>>>>>>>>> If that's not clear feel free to ask.
>>> >>>>>>>>>>
>>> >>>>>>>>>> Taariq
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>> >>>>>>>>>> <ja...@carmanconsulting.com> wrote:
>>> >>>>>>>>>>> Care to share an example?  I'm not picturing it.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <
>>> taar...@gmail.com> wrote:
>>> >>>>>>>>>>>> Hi James
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> I did that too for what it's worth.
>>> >>>>>>>>>>>> I send the message to a route that forwards to both the
>>> aggregator and to the socket.
>>> >>>>>>>>>>>> When the response comes in I use an enricher to add the ID to
>>> the headers and then forward to the aggregator.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Taariq
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <
>>> ja...@carmanconsulting.com> wrote:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>> Willem,
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> Thank you for your help.  I don't think this is doing exactly
>>> what I
>>> >>>>>>>>>>>>> need, though.  The real trick here is the asynchronous nature
>>> of the
>>> >>>>>>>>>>>>> "server" on the other end of this situation.  I thought about
>>> using an
>>> >>>>>>>>>>>>> aggregator to make sure the response gets matched up with the
>>> request
>>> >>>>>>>>>>>>> using a correlation id.  The aggregator wouldn't aggregate
>>> multiple
>>> >>>>>>>>>>>>> responses together into one, it would just make sure it
>>> matches the
>>> >>>>>>>>>>>>> correct response with its request.  Does this sound like a
>>> valid
>>> >>>>>>>>>>>>> approach?  If so, how the heck do I go about it? :)
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> Thanks,
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> James
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <
>>> willem.ji...@gmail.com> wrote:
>>> >>>>>>>>>>>>>> Hi James,
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Camel async process engine already provides the way that you
>>> want.
>>> >>>>>>>>>>>>>> You can take a look at the camel-cxf code[1][2] for some
>>> example.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> [1]
>>> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>> >>>>>>>>>>>>>> [2]
>>> http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<
>>> hzbar...@gmail.com>
>>> >>>>>>>>>>>>>>>  wrote:
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Hi James,
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> I hope I understand your scenario correctly. Here are a
>>> few thoughts. I
>>> >>>>>>>>>>>>>>>> assume want to use camel-netty [1] to send messages to
>>> your sever (if you
>>> >>>>>>>>>>>>>>>> have your own code that does that, you can use it too, but
>>> you'd have to
>>> >>>>>>>>>>>>>>>> write your own Processor or Component). Iiuic, your
>>> scenario is converting a
>>> >>>>>>>>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat
>>> your exchange as
>>> >>>>>>>>>>>>>>>> an async in-out and let your framework (Camel) decompose
>>> it and compose it
>>> >>>>>>>>>>>>>>>> back again. I would not keep threads blocked so I believe
>>> your best bet is
>>> >>>>>>>>>>>>>>>> using the Camel async messaging [2] and Futures (look at
>>> the examples using
>>> >>>>>>>>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is
>>> stateless so
>>> >>>>>>>>>>>>>>>> you'll need a correlationId, which you must have already
>>> and something to
>>> >>>>>>>>>>>>>>>> keep your state. A good bet would be jms [3], or you could
>>> write your own.
>>> >>>>>>>>>>>>>>>> If you used jms you would need to use both a correlationId
>>> and a replyTo
>>> >>>>>>>>>>>>>>>> queue.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>
>>> from("jms:request-queue").to("netty:output?=correlationId");
>>> >>>>>>>>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> Perhaps a bit more information might be appropriate here.
>>>  Eventually,
>>> >>>>>>>>>>>>>>> I'd like to "expose" this route via web services (using CXF
>>> of
>>> >>>>>>>>>>>>>>> course).  So, I would need to either block the request
>>> thread, waiting
>>> >>>>>>>>>>>>>>> for a reply or perhaps check out the new Servlet 3.0
>>> asynchronous
>>> >>>>>>>>>>>>>>> processing stuff (I'm thinking this might help us get more
>>> done with
>>> >>>>>>>>>>>>>>> less http request threads) to do more of a continuation
>>> thing.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> We already have a correlation id.  The "protocol" requires
>>> one and the
>>> >>>>>>>>>>>>>>> server process just echos it back in the response message.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> You may have to play a bit with the correlationId and if
>>> you cannot use
>>> >>>>>>>>>>>>>>>> the same you can do a second transformation/correlation
>>> using a claim-check
>>> >>>>>>>>>>>>>>>> sort of pattern. If you don't want to use jms you can
>>> implement your own (in
>>> >>>>>>>>>>>>>>>> memory) persistence and correlation. You can also use a
>>> resequencer [4] if
>>> >>>>>>>>>>>>>>>> you want to enforce the order. If you use asyncCallback,
>>> you get the replies
>>> >>>>>>>>>>>>>>>> when they become available, and you can control that.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> I don't think a resequencer is necessary.  I don't want to
>>> guarantee
>>> >>>>>>>>>>>>>>> the ordering.  I'm mostly interested in throughput here.
>>>  So, if a
>>> >>>>>>>>>>>>>>> message comes in after another, but it can be processed
>>> faster, so be
>>> >>>>>>>>>>>>>>> it.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> It's an interesting scenario, I'll definitely give it more
>>> thought, but I
>>> >>>>>>>>>>>>>>>> hope this helps.
>>> >>>>>>>>>>>>>>>> Hadrian
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> --
>>> >>>>>>>>>>>>>> Willem
>>> >>>>>>>>>>>>>> ----------------------------------
>>> >>>>>>>>>>>>>> FuseSource
>>> >>>>>>>>>>>>>> Web: http://www.fusesource.com
>>> >>>>>>>>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>> >>>>>>>>>>>>>>         http://jnn.javaeye.com (Chinese)
>>> >>>>>>>>>>>>>> Twitter: willemjiang
>>> >>>>>>>>>>>>>> Weibo: willemjiang
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >
>>>
>>
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to