Author: arnaudsimon
Date: Mon Oct 15 03:59:02 2007
New Revision: 584728
URL: http://svn.apache.org/viewvc?rev=584728&view=rev
Log:
Changed handling of replyTo
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=584728&r1=584727&r2=584728&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Oct 15 03:59:02 2007
@@ -147,17 +147,15 @@
getSession().getAMQConnection().exceptionReceived(e);
}
Struct[] headers = {message.getMessageProperties(),
message.getDeliveryProperties()};
- // if there is a replyto destination then we need to request the
exchange info
+ // if there is a replyto destination then we need to request the
exchange info
ReplyTo replyTo = message.getMessageProperties().getReplyTo();
if (replyTo != null &&
replyTo.getExchangeName() != null &&
!replyTo.getExchangeName().equals(""))
{
- Future<ExchangeQueryResult> future = ((AMQSession_0_10)
getSession()).getQpidSession()
-
.exchangeQuery(message.getMessageProperties().getReplyTo().getExchangeName());
- ExchangeQueryResult res = future.get();
//
<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
- String replyToUrl = res.getType() + "://" +
message.getMessageProperties().getReplyTo()
+ // the exchnage class will be set later from within the sesion
thread
+ String replyToUrl = message.getMessageProperties().getReplyTo()
.getExchangeName() + "/" +
message.getMessageProperties().getReplyTo()
.getRoutingKey() + "/" +
message.getMessageProperties().getReplyTo().getRoutingKey();
newMessage.setReplyToURL(replyToUrl);
@@ -199,6 +197,21 @@
super.postDeliver(msg);
}
+ void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ {
+ // if there is a replyto destination then we need to request the
exchange info
+ String replyToURL = messageFrame.getReplyToURL() ;
+ if (replyToURL != null && ! replyToURL.equals(""))
+ {
+ String exchangeName = replyToURL.substring(0,
replyToURL.indexOf('/'));
+ Future<ExchangeQueryResult> future = ((AMQSession_0_10)
getSession()).getQpidSession().exchangeQuery(exchangeName);
+ ExchangeQueryResult res = future.get();
+ //
<exch_class>://<exch_name>/[<destination>]/[<queue>]?<option>='<value>'[,<option>='<value>']*
+ String replyToUrl = res.getType() + "://" + replyToURL;
+ ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
+ }
+ super.notifyMessage(messageFrame, channelId);
+ }
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
UnprocessedMessage<Struct[], ByteBuffer> messageFrame) throws
Exception