Author: rajith
Date: Wed Jan 16 13:57:47 2008
New Revision: 612581
URL: http://svn.apache.org/viewvc?rev=612581&view=rev
Log:
Please refer JIRA's 739,740 and 741 for more information about the issues.
AMQDestination
Renamed the destinationName to routingKey as it is incorrectly used.
Also modified it to recognize fannout exchange
AMQQueue
Modified to return the proper routing key to support situations where the queue
name and the routing key is different.
BasicMessageProducer_0_10.java
Added a temp hack to interoperate with python. The bug is in python and it
needs to be fixed.
Basically python relies on the content length to pass the content frames
properly.
So I added a line to calculate the content length and sets it in the message
properties.
The rest was modified to reflect the change done in AMQDestination.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java
Wed Jan 16 13:57:47 2008
@@ -40,8 +40,6 @@
protected final AMQShortString _exchangeClass;
- protected final AMQShortString _destinationName;
-
protected final boolean _isDurable;
protected final boolean _isExclusive;
@@ -50,6 +48,8 @@
private AMQShortString _queueName;
+ private AMQShortString _routingKey;
+
private String _url;
private AMQShortString _urlAsShortString;
@@ -73,17 +73,17 @@
{
_exchangeName = binding.getExchangeName();
_exchangeClass = binding.getExchangeClass();
- _destinationName = binding.getDestinationName();
_isExclusive =
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
_isAutoDelete =
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
_isDurable =
Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
_queueName = binding.getQueueName() == null ? null : new
AMQShortString(binding.getQueueName());
+ _routingKey = binding.getRoutingKey() == null ? null : new
AMQShortString(binding.getRoutingKey());
}
- protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString destinationName, AMQShortString queueName)
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString routingKey, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, destinationName, false, false,
queueName);
+ this(exchangeName, exchangeClass, routingKey, false, false, queueName);
}
protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString destinationName)
@@ -91,22 +91,23 @@
this(exchangeName, exchangeClass, destinationName, false, false, null);
}
- protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString routingKey, boolean isExclusive,
boolean isAutoDelete, AMQShortString queueName)
{
- this(exchangeName, exchangeClass, destinationName, isExclusive,
isAutoDelete, queueName, false);
+ this(exchangeName, exchangeClass, routingKey, isExclusive,
isAutoDelete, queueName, false);
}
- protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString destinationName, boolean isExclusive,
+ protected AMQDestination(AMQShortString exchangeName, AMQShortString
exchangeClass, AMQShortString routingKey, boolean isExclusive,
boolean isAutoDelete, AMQShortString queueName,
boolean isDurable)
{
- if (destinationName == null)
+ // If used with a fannout exchange, the routing key can be null
+ if ( !ExchangeDefaults.FANOUT_EXCHANGE_CLASS.equals(exchangeClass) &&
routingKey == null)
{
- throw new IllegalArgumentException("Destination exchange must not
be null");
+ throw new IllegalArgumentException("routingKey exchange must not
be null");
}
if (exchangeName == null)
{
- throw new IllegalArgumentException("Exchange exchange must not be
null");
+ throw new IllegalArgumentException("Exchange name must not be
null");
}
if (exchangeClass == null)
{
@@ -114,7 +115,7 @@
}
_exchangeName = exchangeName;
_exchangeClass = exchangeClass;
- _destinationName = destinationName;
+ _routingKey = routingKey;
_isExclusive = isExclusive;
_isAutoDelete = isAutoDelete;
_queueName = queueName;
@@ -155,11 +156,6 @@
return ExchangeDefaults.DIRECT_EXCHANGE_CLASS.equals(_exchangeClass);
}
- public AMQShortString getDestinationName()
- {
- return _destinationName;
- }
-
public String getQueueName()
{
return _queueName == null ? null : _queueName.toString();
@@ -170,8 +166,6 @@
return _queueName;
}
-
-
public void setQueueName(AMQShortString queueName)
{
@@ -182,7 +176,10 @@
_byteEncoding = null;
}
- public abstract AMQShortString getRoutingKey();
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
public boolean isExclusive()
{
@@ -213,7 +210,7 @@
}
public String toURL()
- {
+ {
String url = _url;
if(url == null)
{
@@ -225,14 +222,7 @@
sb.append("://");
sb.append(_exchangeName);
- sb.append('/');
-
- if (_destinationName != null)
- {
- sb.append(_destinationName);
- }
-
- sb.append('/');
+ sb.append("//");
if (_queueName != null)
{
@@ -278,7 +268,7 @@
{
int size = _exchangeClass.length() + 1 +
_exchangeName.length() + 1 +
- (_destinationName == null ? 0 :
_destinationName.length()) + 1 +
+ 0 + // in place of the destination name
(_queueName == null ? 0 : _queueName.length()) + 1 +
1;
encoding = new byte[size];
@@ -286,14 +276,9 @@
pos = _exchangeClass.writeToByteArray(encoding, pos);
pos = _exchangeName.writeToByteArray(encoding, pos);
- if(_destinationName == null)
- {
- encoding[pos++] = (byte)0;
- }
- else
- {
- pos = _destinationName.writeToByteArray(encoding,pos);
- }
+
+ encoding[pos++] = (byte)0;
+
if(_queueName == null)
{
encoding[pos++] = (byte)0;
@@ -337,10 +322,6 @@
final AMQDestination that = (AMQDestination) o;
- if (!_destinationName.equals(that._destinationName))
- {
- return false;
- }
if (!_exchangeClass.equals(that._exchangeClass))
{
return false;
@@ -363,7 +344,7 @@
int result;
result = _exchangeName.hashCode();
result = 29 * result + _exchangeClass.hashCode();
- result = 29 * result + _destinationName.hashCode();
+ //result = 29 * result + _destinationName.hashCode();
if (_queueName != null)
{
result = 29 * result + _queueName.hashCode();
@@ -386,7 +367,7 @@
{
AMQShortString exchangeClass;
AMQShortString exchangeName;
- AMQShortString destinationName;
+ AMQShortString routingKey;
AMQShortString queueName;
boolean isDurable;
boolean isExclusive;
@@ -397,8 +378,8 @@
pos+= exchangeClass.length() + 1;
exchangeName =
AMQShortString.readFromByteArray(byteEncodedDestination, pos);
pos+= exchangeName.length() + 1;
- destinationName =
AMQShortString.readFromByteArray(byteEncodedDestination, pos);
- pos+= (destinationName == null ? 0 : destinationName.length()) + 1;
+ routingKey = AMQShortString.readFromByteArray(byteEncodedDestination,
pos);
+ pos+= (routingKey == null ? 0 : routingKey.length()) + 1;
queueName = AMQShortString.readFromByteArray(byteEncodedDestination,
pos);
pos+= (queueName == null ? 0 : queueName.length()) + 1;
int options = byteEncodedDestination[pos];
@@ -408,15 +389,15 @@
if (exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
- return new
AMQQueue(exchangeName,destinationName,queueName,isExclusive,isAutoDelete,isDurable);
+ return new
AMQQueue(exchangeName,routingKey,queueName,isExclusive,isAutoDelete,isDurable);
}
else if (exchangeClass.equals(ExchangeDefaults.TOPIC_EXCHANGE_CLASS))
{
- return new
AMQTopic(exchangeName,destinationName,isAutoDelete,queueName,isDurable);
+ return new
AMQTopic(exchangeName,routingKey,isAutoDelete,queueName,isDurable);
}
else if (exchangeClass.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
{
- return new AMQHeadersExchange(destinationName);
+ return new AMQHeadersExchange(routingKey);
}
else
{
@@ -442,6 +423,10 @@
else if (type.equals(ExchangeDefaults.HEADERS_EXCHANGE_CLASS))
{
return new AMQHeadersExchange(binding);
+ }
+ else if (type.equals(ExchangeDefaults.FANOUT_EXCHANGE_CLASS))
+ {
+ return new AMQQueue(binding);
}
else
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQHeadersExchange.java
Wed Jan 16 13:57:47 2008
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -42,11 +42,6 @@
public AMQHeadersExchange(AMQShortString queueName)
{
super(queueName, ExchangeDefaults.HEADERS_EXCHANGE_CLASS, queueName,
true, true, null);
- }
-
- public AMQShortString getRoutingKey()
- {
- return getDestinationName();
}
public boolean isNameRequired()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQQueue.java
Wed Jan 16 13:57:47 2008
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,6 @@
package org.apache.qpid.client;
import javax.jms.Queue;
-import javax.jms.Connection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
@@ -82,7 +81,7 @@
public AMQQueue(AMQConnection connection, String name, boolean temporary)
{
- this(connection.getDefaultQueueExchangeName(), new
AMQShortString(name),temporary);
+ this(connection.getDefaultQueueExchangeName(), new
AMQShortString(name),temporary);
}
@@ -111,39 +110,39 @@
// queue name is set to null indicating that the broker assigns a name
in the case of temporary queues
// temporary queues are typically used as response queues
this(exchangeName, name, temporary?null:name, temporary, temporary,
!temporary);
-
+
}
/**
* Create a reference to a queue. Note this does not actually imply the
queue exists.
- * @param destinationName the queue name
+ * @param exchangeName the exchange name we want to send the message to
+ * @param routingKey the routing key
* @param queueName the queue name
* @param exclusive true if the queue should only permit a single consumer
* @param autoDelete true if the queue should be deleted automatically
when the last consumers detaches
*/
- public AMQQueue(AMQShortString exchangeName, AMQShortString
destinationName, AMQShortString queueName, boolean exclusive, boolean
autoDelete)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey,
AMQShortString queueName, boolean exclusive, boolean autoDelete)
{
- this(exchangeName, destinationName, queueName, exclusive, autoDelete,
false);
+ this(exchangeName, routingKey, queueName, exclusive, autoDelete,
false);
}
- public AMQQueue(AMQShortString exchangeName, AMQShortString
destinationName, AMQShortString queueName, boolean exclusive, boolean
autoDelete, boolean durable)
+ public AMQQueue(AMQShortString exchangeName, AMQShortString routingKey,
AMQShortString queueName, boolean exclusive, boolean autoDelete, boolean
durable)
{
- super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
destinationName, exclusive,
+ super(exchangeName, ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
routingKey, exclusive,
autoDelete, queueName, durable);
}
-
-
public AMQShortString getRoutingKey()
{
- if( getDestinationName() != null && ! getDestinationName().equals(""))
+ //return getAMQQueueName();
+ if (getAMQQueueName() != null &&
getAMQQueueName().equals(super.getRoutingKey()))
{
- return getDestinationName();
+ return getAMQQueueName();
}
else
{
- return getAMQQueueName();
+ return super.getRoutingKey();
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Jan 16 13:57:47 2008
@@ -738,7 +738,7 @@
AMQShortString topicName;
if (topic instanceof AMQTopic)
{
- topicName = ((AMQTopic) topic).getDestinationName();
+ topicName = ((AMQTopic) topic).getRoutingKey();
}
else
{
@@ -1039,7 +1039,7 @@
}
public abstract TemporaryQueue createTemporaryQueue() throws JMSException;
-
+
public TemporaryTopic createTemporaryTopic() throws JMSException
{
checkNotClosed();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Wed Jan 16 13:57:47 2008
@@ -526,9 +526,9 @@
try
{
// this is done so that we can produce to a temporary queue beofre
we create a consumer
- sendCreateQueue(result.getDestinationName(),
result.isAutoDelete(), result.isDurable(), result.isExclusive());
- sendQueueBind(result.getDestinationName(),
result.getDestinationName(), new FieldTable(), result.getExchangeName());
- result.setQueueName(result.getDestinationName());
+ sendCreateQueue(result.getRoutingKey(), result.isAutoDelete(),
result.isDurable(), result.isExclusive());
+ sendQueueBind(result.getRoutingKey(), result.getRoutingKey(), new
FieldTable(), result.getExchangeName());
+ result.setQueueName(result.getRoutingKey());
}
catch (Exception e)
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
Wed Jan 16 13:57:47 2008
@@ -74,7 +74,7 @@
public static AMQTopic createDurableTopic(AMQTopic topic, String
subscriptionName, AMQConnection connection)
throws JMSException
{
- return new AMQTopic(topic.getExchangeName(),
topic.getDestinationName(), false,
+ return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(),
false,
getDurableTopicQueueName(subscriptionName,
connection),
true);
}
@@ -86,12 +86,12 @@
public String getTopicName() throws JMSException
{
- return super.getDestinationName().toString();
+ return super.getRoutingKey().toString();
}
public AMQShortString getRoutingKey()
{
- return getDestinationName();
+ return getRoutingKey();
}
public boolean isNameRequired()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQUndefinedDestination.java
Wed Jan 16 13:57:47 2008
@@ -33,11 +33,6 @@
super(exchange, UNKNOWN_EXCHANGE_CLASS, routingKey, queueName);
}
- public AMQShortString getRoutingKey()
- {
- return getDestinationName();
- }
-
public boolean isNameRequired()
{
return getAMQQueueName() == null;
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Wed Jan 16 13:57:47 2008
@@ -131,12 +131,21 @@
origMessage.setJMSMessageID(message.getJMSMessageID());
origMessage.setJMSDeliveryMode(deliveryMode);
}
+
BasicContentHeaderProperties contentHeaderProperties =
message.getContentHeaderProperties();
if (contentHeaderProperties.reset())
{
// set the application properties
message.get010Message().getMessageProperties()
.setContentType(contentHeaderProperties.getContentType().toString());
+
+ /* Temp hack to get the JMS client to interoperate with the python
client
+ as it relies on content length to determine the boundaries for
+ message framesets. The python client should be fixed.
+ */
+
message.get010Message().getMessageProperties().setContentLength(message.getContentLength());
+
+
AMQShortString type = contentHeaderProperties.getType();
if (type != null)
{
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Wed Jan 16 13:57:47 2008
@@ -722,6 +722,11 @@
}
}
+ public int getContentLength()
+ {
+ return _data.remaining();
+ }
+
public void setConsumer(BasicMessageConsumer basicMessageConsumer)
{
_consumer = basicMessageConsumer;
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java?rev=612581&r1=612580&r2=612581&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/url/AMQBindingURL.java
Wed Jan 16 13:57:47 2008
@@ -259,7 +259,14 @@
{
if (_exchangeClass.equals(ExchangeDefaults.DIRECT_EXCHANGE_CLASS))
{
- return getQueueName();
+ if (containsOption(BindingURL.OPTION_ROUTING_KEY))
+ {
+ return new AMQShortString(getOption(OPTION_ROUTING_KEY));
+ }
+ else
+ {
+ return getQueueName();
+ }
}
if (containsOption(BindingURL.OPTION_ROUTING_KEY))