Author: arnaudsimon
Date: Thu Jan 17 05:59:52 2008
New Revision: 612820
URL: http://svn.apache.org/viewvc?rev=612820&view=rev
Log:
see qpid-742
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=612820&r1=612819&r2=612820&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Thu Jan 17 05:59:52 2008
@@ -423,6 +423,15 @@
}, _connection).execute();
}
+
+ public void addBindingKey(BasicMessageConsumer consumer, AMQDestination
amqd, String routingKey) throws AMQException
+ {
+ if( consumer.getQueuename() != null)
+ {
+ bindQueue(consumer.getQueuename(), new AMQShortString(routingKey),
new FieldTable(), amqd.getExchangeName());
+ }
+ }
+
public abstract void sendQueueBind(final AMQShortString queueName, final
AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName) throws AMQException,
FailoverException;
@@ -2155,6 +2164,9 @@
declareExchange(amqd, protocolHandler, false);
AMQShortString queueName = declareQueue(amqd, protocolHandler);
+
+ // store the consumer queue name
+ consumer.setQueuename(queueName);
// bindQueue(amqd, queueName, protocolHandler,
consumer.getRawSelectorFieldTable());
bindQueue(queueName, amqd.getRoutingKey(),
consumer.getRawSelectorFieldTable(), amqd.getExchangeName());
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java?rev=612820&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
Thu Jan 17 05:59:52 2008
@@ -0,0 +1,11 @@
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+
+import javax.jms.Topic;
+
+public interface AMQTopicSubscriber
+{
+
+ void addBindingKey(Topic topic, String bindingKey) throws AMQException;
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=612820&r1=612819&r2=612820&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Thu Jan 17 05:59:52 2008
@@ -41,6 +41,7 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
+import org.apache.qpid.AMQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,6 +144,13 @@
*/
private Thread _receivingThread;
+
+ /**
+ * Used to store this consumer queue name
+ * Usefull when more than binding key should be used
+ */
+ private AMQShortString _queuename;
+
/**
* autoClose denotes that the consumer will automatically cancel itself
when there are no more messages to receive
* on the queue. This is used for queue browsing.
@@ -969,5 +977,20 @@
public void stop()
{
// do nothing as this is a 0_10 feature
+ }
+
+ public AMQShortString getQueuename()
+ {
+ return _queuename;
+ }
+
+ public void setQueuename(AMQShortString queuename)
+ {
+ this._queuename = queuename;
+ }
+
+ public void addBindingKey(AMQDestination amqd, String routingKey) throws
AMQException
+ {
+ _session.addBindingKey(this,amqd,routingKey);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=612820&r1=612819&r2=612820&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
Thu Jan 17 05:59:52 2008
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQException;
+
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
@@ -30,7 +32,7 @@
* Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
*
*/
-class TopicSubscriberAdaptor implements TopicSubscriber
+class TopicSubscriberAdaptor implements TopicSubscriber, AMQTopicSubscriber
{
private final Topic _topic;
private final BasicMessageConsumer _consumer;
@@ -123,4 +125,8 @@
return _consumer;
}
+ public void addBindingKey(Topic topic, String bindingKey) throws
AMQException
+ {
+ _consumer.addBindingKey((AMQDestination) topic, bindingKey);
+ }
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=612820&r1=612819&r2=612820&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
Thu Jan 17 05:59:52 2008
@@ -53,7 +53,7 @@
protected ByteBuffer _data;
private boolean _readableProperties = false;
protected boolean _readableMessage = false;
- protected boolean _changedData;
+ protected boolean _changedData = true;
private Destination _destination;
private JMSHeaderAdapter _headerAdapter;
private BasicMessageConsumer _consumer;