Author: arnaudsimon
Date: Thu Jan 17 05:59:52 2008
New Revision: 612820

URL: http://svn.apache.org/viewvc?rev=612820&view=rev
Log:
see qpid-742

Added:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=612820&r1=612819&r2=612820&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Thu Jan 17 05:59:52 2008
@@ -423,6 +423,15 @@
         }, _connection).execute();
     }
 
+
+    public void addBindingKey(BasicMessageConsumer consumer, AMQDestination 
amqd, String routingKey) throws AMQException
+    {
+        if( consumer.getQueuename() != null)
+        {
+            bindQueue(consumer.getQueuename(), new AMQShortString(routingKey), 
new FieldTable(), amqd.getExchangeName());
+        }
+    }
+
     public abstract void sendQueueBind(final AMQShortString queueName, final 
AMQShortString routingKey, final FieldTable arguments,
             final AMQShortString exchangeName) throws AMQException, 
FailoverException;
 
@@ -2155,6 +2164,9 @@
         declareExchange(amqd, protocolHandler, false);
 
         AMQShortString queueName = declareQueue(amqd, protocolHandler);
+
+        // store the consumer queue name
+        consumer.setQueuename(queueName);
 
         // bindQueue(amqd, queueName, protocolHandler, 
consumer.getRawSelectorFieldTable());
         bindQueue(queueName, amqd.getRoutingKey(), 
consumer.getRawSelectorFieldTable(), amqd.getExchangeName());

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java?rev=612820&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopicSubscriber.java
 Thu Jan 17 05:59:52 2008
@@ -0,0 +1,11 @@
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+
+import javax.jms.Topic;
+
+public interface AMQTopicSubscriber
+{
+
+    void addBindingKey(Topic topic, String bindingKey) throws AMQException;
+}

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=612820&r1=612819&r2=612820&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Thu Jan 17 05:59:52 2008
@@ -41,6 +41,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
+import org.apache.qpid.AMQException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -143,6 +144,13 @@
      */
     private Thread _receivingThread;
 
+
+    /**
+     * Used to store this consumer queue name
+     * Usefull when more than binding key should be used
+     */
+    private AMQShortString _queuename;
+
     /**
      * autoClose denotes that the consumer will automatically cancel itself 
when there are no more messages to receive
      * on the queue.  This is used for queue browsing.
@@ -969,5 +977,20 @@
     public void stop()
     {
         // do nothing as this is a 0_10 feature
+    }
+
+    public AMQShortString getQueuename()
+    {
+        return _queuename;
+    }
+
+    public void setQueuename(AMQShortString queuename)
+    {
+        this._queuename = queuename;
+    }
+
+    public void addBindingKey(AMQDestination amqd, String routingKey) throws 
AMQException 
+    {
+        _session.addBindingKey(this,amqd,routingKey);
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?rev=612820&r1=612819&r2=612820&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
 Thu Jan 17 05:59:52 2008
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.AMQException;
+
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
@@ -30,7 +32,7 @@
  * Wraps a MessageConsumer to fulfill the extended TopicSubscriber contract
  *
  */
-class TopicSubscriberAdaptor implements TopicSubscriber
+class TopicSubscriberAdaptor implements TopicSubscriber, AMQTopicSubscriber
 {
     private final Topic _topic;
     private final BasicMessageConsumer _consumer;
@@ -123,4 +125,8 @@
         return _consumer;
     }
 
+    public void addBindingKey(Topic topic, String bindingKey) throws 
AMQException
+    {
+        _consumer.addBindingKey((AMQDestination) topic, bindingKey);
+    }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=612820&r1=612819&r2=612820&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
 Thu Jan 17 05:59:52 2008
@@ -53,7 +53,7 @@
     protected ByteBuffer _data;
     private boolean _readableProperties = false;
     protected boolean _readableMessage = false;
-    protected boolean _changedData;
+    protected boolean _changedData = true;
     private Destination _destination;
     private JMSHeaderAdapter _headerAdapter;
     private BasicMessageConsumer _consumer;


Reply via email to