Author: arnaudsimon
Date: Wed Feb  6 08:00:22 2008
New Revision: 619043

URL: http://svn.apache.org/viewvc?rev=619043&view=rev
Log:
Added close logic for releasing pre-fetched messages, see QPID-778

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=619043&r1=619042&r2=619043&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Feb  6 08:00:22 2008
@@ -195,7 +195,7 @@
      *
      * @todo Weaken the type once [EMAIL PROTECTED] 
FlowControllingBlockingQueue} implements Queue.
      */
-    private final FlowControllingBlockingQueue _queue;
+    protected final FlowControllingBlockingQueue _queue;
 
     /**
      * Holds the highest received delivery tag.

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=619043&r1=619042&r2=619043&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Feb  6 08:00:22 2008
@@ -27,6 +27,7 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpidity.nclient.Session;
 import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpidity.ErrorCode;
@@ -43,6 +44,8 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.UUID;
 import java.util.Map;
+import java.util.Iterator;
+
 /**
  * This is a 0.10 Session
  */
@@ -239,6 +242,25 @@
     }
 
     /**
+     * We need to release message that may be pre-fetched in the local queue
+     *
+     * @throws JMSException
+     */
+    public void close() throws JMSException
+    {
+        super.close();
+        // We need to release pre-fetched messages
+        Iterator messages=_queue.iterator();
+        while (messages.hasNext())
+        {
+            UnprocessedMessage message=(UnprocessedMessage) messages.next();
+            messages.remove();
+            rejectMessage(message, true);
+        }
+    }
+
+
+    /**
      * Commit the receipt and the delivery of all messages exchanged by this 
session resources.
      */
     public void sendCommit() throws AMQException, FailoverException
@@ -615,10 +637,13 @@
         }
     }
 
-     void stop() throws AMQException
+
+
+
+    void stop() throws AMQException
     {
         super.stop();
-           for(BasicMessageConsumer  c:  _consumers.values())
+        for(BasicMessageConsumer  c:  _consumers.values())
         {
               c.stop();
         }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=619043&r1=619042&r2=619043&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
 Wed Feb  6 08:00:22 2008
@@ -39,6 +39,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.Iterator;
 
 /**
  * This is a 0.10 message consumer.
@@ -449,5 +450,18 @@
     public void stop()
     {
         _isStarted = false;
+    }
+
+    public void close() throws JMSException
+    {
+        super.close();
+        // release message that may be staged
+        Iterator messages=_synchronousQueue.iterator();
+        while (messages.hasNext())
+        {
+            AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+            messages.remove();
+            _session.rejectMessage(message, true);
+        }
     }
 }


Reply via email to