Author: rgreig
Date: Fri Dec 15 11:21:46 2006
New Revision: 487625

URL: http://svn.apache.org/viewvc?view=rev&rev=487625
Log:
QPID-194 Patch supplied by Rob Godfrey.

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=487625&r1=487624&r2=487625
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Fri Dec 15 11:21:46 2006
@@ -165,6 +165,8 @@
                 if (consumer == null)
                 {
                     _logger.warn("Received a message from queue " + 
message.deliverBody.consumerTag + " without a handler - ignoring...");
+                    _logger.warn("Consumers that exist: " + _consumers);
+                    _logger.warn("Session hashcode: " + 
System.identityHashCode(this));
                 }
                 else
                 {
@@ -958,26 +960,37 @@
      * @param queueName
      * @return the consumer tag generated by the broker
      */
-    private String consumeFromQueue(String queueName, AMQProtocolHandler 
protocolHandler, int prefetchHigh, int prefetchLow,
-                                    boolean noLocal, boolean exclusive, int 
acknowledgeMode, boolean nowait) throws AMQException
+    private void consumeFromQueue(BasicMessageConsumer consumer, String 
queueName, AMQProtocolHandler protocolHandler,
+                                  boolean nowait) throws AMQException
     {
         //fixme prefetch values are not used here. Do we need to have them as 
parametsrs?
         //need to generate a consumer tag on the client so we can exploit the 
nowait flag
         String tag = Integer.toString(_nextTag++);
 
-        AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 0,
-                                                              queueName, tag, 
noLocal,
-                                                              acknowledgeMode 
== Session.NO_ACKNOWLEDGE,
-                                                              exclusive, 
nowait);
-        if (nowait)
+        consumer.setConsumerTag(tag);
+        // we must register the consumer in the map before we actually start 
listening
+        _consumers.put(tag, consumer);
+        try
         {
-            protocolHandler.writeFrame(jmsConsume);
+            AMQFrame jmsConsume = BasicConsumeBody.createAMQFrame(_channelId, 
0,
+                                                                  queueName, 
tag, consumer.isNoLocal(),
+                                                                  
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
+                                                                  
consumer.isExclusive(), nowait);
+            if (nowait)
+            {
+                protocolHandler.writeFrame(jmsConsume);
+            }
+            else
+            {
+                protocolHandler.syncWrite(jmsConsume, 
BasicConsumeOkBody.class);
+            }
         }
-        else
+        catch (AMQException e)
         {
-            protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
+            // clean-up the map in the event of an error
+            _consumers.remove(tag);
+            throw e;
         }
-        return tag;
     }
 
     public Queue createQueue(String queueName) throws JMSException
@@ -1354,12 +1367,7 @@
 
         bindQueue(amqd, queueName, protocolHandler, 
consumer.getRawSelectorFieldTable());
 
-        String consumerTag = consumeFromQueue(queueName, protocolHandler, 
consumer.getPrefetchHigh(),
-                                              consumer.getPrefetchLow(), 
consumer.isNoLocal(), consumer.isExclusive(),
-                                              consumer.getAcknowledgeMode(), 
nowait);
-
-        consumer.setConsumerTag(consumerTag);
-        _consumers.put(consumerTag, consumer);
+        consumeFromQueue(consumer, queueName, protocolHandler, nowait);
     }
 
     /**

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=487625&r1=487624&r2=487625
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Fri Dec 15 11:21:46 2006
@@ -280,7 +280,7 @@
     public Message receive(long l) throws JMSException
     {
        checkPreConditions();
-
+        
         acquireReceiving();
 
         try


Reply via email to