Author: arnaudsimon
Date: Wed Feb 6 08:00:22 2008
New Revision: 619043
URL: http://svn.apache.org/viewvc?rev=619043&view=rev
Log:
Added close logic for releasing pre-fetched messages, see QPID-778
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=619043&r1=619042&r2=619043&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Feb 6 08:00:22 2008
@@ -195,7 +195,7 @@
*
* @todo Weaken the type once [EMAIL PROTECTED]
FlowControllingBlockingQueue} implements Queue.
*/
- private final FlowControllingBlockingQueue _queue;
+ protected final FlowControllingBlockingQueue _queue;
/**
* Holds the highest received delivery tag.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=619043&r1=619042&r2=619043&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Wed Feb 6 08:00:22 2008
@@ -27,6 +27,7 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.FiledTableSupport;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.ErrorCode;
@@ -43,6 +44,8 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.UUID;
import java.util.Map;
+import java.util.Iterator;
+
/**
* This is a 0.10 Session
*/
@@ -239,6 +242,25 @@
}
/**
+ * We need to release message that may be pre-fetched in the local queue
+ *
+ * @throws JMSException
+ */
+ public void close() throws JMSException
+ {
+ super.close();
+ // We need to release pre-fetched messages
+ Iterator messages=_queue.iterator();
+ while (messages.hasNext())
+ {
+ UnprocessedMessage message=(UnprocessedMessage) messages.next();
+ messages.remove();
+ rejectMessage(message, true);
+ }
+ }
+
+
+ /**
* Commit the receipt and the delivery of all messages exchanged by this
session resources.
*/
public void sendCommit() throws AMQException, FailoverException
@@ -615,10 +637,13 @@
}
}
- void stop() throws AMQException
+
+
+
+ void stop() throws AMQException
{
super.stop();
- for(BasicMessageConsumer c: _consumers.values())
+ for(BasicMessageConsumer c: _consumers.values())
{
c.stop();
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=619043&r1=619042&r2=619043&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Wed Feb 6 08:00:22 2008
@@ -39,6 +39,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.Iterator;
/**
* This is a 0.10 message consumer.
@@ -449,5 +450,18 @@
public void stop()
{
_isStarted = false;
+ }
+
+ public void close() throws JMSException
+ {
+ super.close();
+ // release message that may be staged
+ Iterator messages=_synchronousQueue.iterator();
+ while (messages.hasNext())
+ {
+ AbstractJMSMessage message=(AbstractJMSMessage) messages.next();
+ messages.remove();
+ _session.rejectMessage(message, true);
+ }
}
}