In my case, the originating request comes from CXF.  How do I send the
aggregated response back to CXF?

On Tue, Aug 16, 2011 at 4:29 PM, Taariq Levack <taar...@gmail.com> wrote:
> The consumer that handles the aggregated/timed-out request or response.
>
> I have to resend a few times if it's the request, I simply feed it back into 
> "direct:socketRequestRoute" with the header for the number of retry attempts 
> incremented.
> If it's the response I can forward to some process.
>
> Taariq
>
> On 16 Aug 2011, at 10:18 PM, James Carman <ja...@carmanconsulting.com> wrote:
>
>> What's listening on the:
>>
>> to("direct:requestResponse")
>>
>> On Tue, Aug 16, 2011 at 3:56 PM, Taariq Levack <taar...@gmail.com> wrote:
>>> Sure
>>>
>>> You can of course solve what I've described many ways, but I'll
>>> explain using 3 routes as that's what I used.
>>>
>>> This first route is the main route I mentioned earlier, so you send
>>> your socket messages here and it's multicast to both the aggregator
>>> and to the socket.
>>>
>>> from("direct:socketRequestRoute").multicast().to("direct:requestResponseAggregator",
>>>  "someOutboundSocketEndpoint");
>>>
>>>
>>> This next route will aggregate, both requests and responses are sent
>>> here as you envisaged.
>>> from("direct:requestResponseAggregator").
>>>                .aggregate(header("someCorrellationId"),
>>> requestResponseAggregator)
>>>                .completionSize(2)
>>>                .completionTimeout(5000)
>>>                .to("direct:requestResponse"); //Here you can send the
>>> "aggregated" message, in my case it's only the response I forward
>>> unless there's a timeout, then I forward the request of course.
>>>
>>> Finally the route that consumes the socket responses.
>>> from(someInboundSocketEndpoint).processRef("headerEnricher").to("direct:requestResponseAggregator");
>>>   //this headerEnricher doesn't have to be a processor, you have many
>>> options to add a header.
>>>
>>> If that's not clear feel free to ask.
>>>
>>> Taariq
>>>
>>>
>>> On Tue, Aug 16, 2011 at 9:30 PM, James Carman
>>> <ja...@carmanconsulting.com> wrote:
>>>> Care to share an example?  I'm not picturing it.
>>>>
>>>> On Tue, Aug 16, 2011 at 3:23 PM, Taariq Levack <taar...@gmail.com> wrote:
>>>>> Hi James
>>>>>
>>>>> I did that too for what it's worth.
>>>>> I send the message to a route that forwards to both the aggregator and to 
>>>>> the socket.
>>>>> When the response comes in I use an enricher to add the ID to the headers 
>>>>> and then forward to the aggregator.
>>>>>
>>>>> Taariq
>>>>>
>>>>> On 16 Aug 2011, at 8:55 PM, James Carman <ja...@carmanconsulting.com> 
>>>>> wrote:
>>>>>
>>>>>> Willem,
>>>>>>
>>>>>> Thank you for your help.  I don't think this is doing exactly what I
>>>>>> need, though.  The real trick here is the asynchronous nature of the
>>>>>> "server" on the other end of this situation.  I thought about using an
>>>>>> aggregator to make sure the response gets matched up with the request
>>>>>> using a correlation id.  The aggregator wouldn't aggregate multiple
>>>>>> responses together into one, it would just make sure it matches the
>>>>>> correct response with its request.  Does this sound like a valid
>>>>>> approach?  If so, how the heck do I go about it? :)
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> James
>>>>>>
>>>>>> On Sun, Aug 7, 2011 at 9:03 PM, Willem Jiang <willem.ji...@gmail.com> 
>>>>>> wrote:
>>>>>>> Hi James,
>>>>>>>
>>>>>>> Camel async process engine already provides the way that you want.
>>>>>>> You can take a look at the camel-cxf code[1][2] for some example.
>>>>>>>
>>>>>>> [1]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java?view=markup
>>>>>>> [2]http://svn.apache.org/viewvc/camel/trunk/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfProducer.java?view=markup
>>>>>>>
>>>>>>> On 8/7/11 1:29 AM, James Carman wrote:
>>>>>>>>
>>>>>>>> On Sat, Aug 6, 2011 at 10:33 AM, Zbarcea Hadrian<hzbar...@gmail.com>
>>>>>>>>  wrote:
>>>>>>>>>
>>>>>>>>> Hi James,
>>>>>>>>>
>>>>>>>>> I hope I understand your scenario correctly. Here are a few thoughts. 
>>>>>>>>> I
>>>>>>>>> assume want to use camel-netty [1] to send messages to your sever (if 
>>>>>>>>> you
>>>>>>>>> have your own code that does that, you can use it too, but you'd have 
>>>>>>>>> to
>>>>>>>>> write your own Processor or Component). Iiuic, your scenario is 
>>>>>>>>> converting a
>>>>>>>>> 2x in-only to a 1x in-out async mep. You should then treat your 
>>>>>>>>> exchange as
>>>>>>>>> an async in-out and let your framework (Camel) decompose it and 
>>>>>>>>> compose it
>>>>>>>>> back again. I would not keep threads blocked so I believe your best 
>>>>>>>>> bet is
>>>>>>>>> using the Camel async messaging [2] and Futures (look at the examples 
>>>>>>>>> using
>>>>>>>>> asyncSend* and asyncCallback*). The issue is that Camel is stateless 
>>>>>>>>> so
>>>>>>>>> you'll need a correlationId, which you must have already and 
>>>>>>>>> something to
>>>>>>>>> keep your state. A good bet would be jms [3], or you could write your 
>>>>>>>>> own.
>>>>>>>>> If you used jms you would need to use both a correlationId and a 
>>>>>>>>> replyTo
>>>>>>>>> queue.
>>>>>>>>>
>>>>>>>>> from("jms:request-queue").to("netty:output?=correlationId");
>>>>>>>>> from("netty:input).to("jms:replyTo-queue")
>>>>>>>>>
>>>>>>>>
>>>>>>>> Perhaps a bit more information might be appropriate here.  Eventually,
>>>>>>>> I'd like to "expose" this route via web services (using CXF of
>>>>>>>> course).  So, I would need to either block the request thread, waiting
>>>>>>>> for a reply or perhaps check out the new Servlet 3.0 asynchronous
>>>>>>>> processing stuff (I'm thinking this might help us get more done with
>>>>>>>> less http request threads) to do more of a continuation thing.
>>>>>>>>
>>>>>>>> We already have a correlation id.  The "protocol" requires one and the
>>>>>>>> server process just echos it back in the response message.
>>>>>>>>
>>>>>>>>> You may have to play a bit with the correlationId and if you cannot 
>>>>>>>>> use
>>>>>>>>> the same you can do a second transformation/correlation using a 
>>>>>>>>> claim-check
>>>>>>>>> sort of pattern. If you don't want to use jms you can implement your 
>>>>>>>>> own (in
>>>>>>>>> memory) persistence and correlation. You can also use a resequencer 
>>>>>>>>> [4] if
>>>>>>>>> you want to enforce the order. If you use asyncCallback, you get the 
>>>>>>>>> replies
>>>>>>>>> when they become available, and you can control that.
>>>>>>>>>
>>>>>>>>
>>>>>>>> I don't think a resequencer is necessary.  I don't want to guarantee
>>>>>>>> the ordering.  I'm mostly interested in throughput here.  So, if a
>>>>>>>> message comes in after another, but it can be processed faster, so be
>>>>>>>> it.
>>>>>>>>
>>>>>>>>> It's an interesting scenario, I'll definitely give it more thought, 
>>>>>>>>> but I
>>>>>>>>> hope this helps.
>>>>>>>>> Hadrian
>>>>>>>>>
>>>>>>>>
>>>>>>>> You have been very helpful.  Thank you for taking the time!
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Willem
>>>>>>> ----------------------------------
>>>>>>> FuseSource
>>>>>>> Web: http://www.fusesource.com
>>>>>>> Blog:    http://willemjiang.blogspot.com (English)
>>>>>>>         http://jnn.javaeye.com (Chinese)
>>>>>>> Twitter: willemjiang
>>>>>>> Weibo: willemjiang
>>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to