Author: rgreig
Date: Mon Dec 18 04:12:57 2006
New Revision: 488249
URL: http://svn.apache.org/viewvc?view=rev&rev=488249
Log:
QPID-209 Fix to clear unacked message list on recover()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488249&r1=488248&r2=488249
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Dec 18 04:12:57 2006
@@ -23,23 +23,22 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.server.handler.ExchangeBoundHandler;
-import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.client.failover.FailoverSupport;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.handler.ExchangeBoundHandler;
import org.apache.qpid.url.AMQBindingURL;
import org.apache.qpid.url.URLSyntaxException;
-
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.io.Serializable;
@@ -50,7 +49,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
public class AMQSession extends Closeable implements Session, QueueSession,
TopicSession
{
@@ -184,7 +182,7 @@
}
else
{
-
+
consumer.notifyMessage(message, _channelId);
}
@@ -698,7 +696,10 @@
{
checkNotClosed();
checkNotTransacted(); // throws IllegalStateException if a transacted
session
-
+ for (BasicMessageConsumer consumer : _consumers.values())
+ {
+ consumer.clearUnackedMessages();
+ }
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
false));
}
@@ -1474,7 +1475,7 @@
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if(subscriptionName != null)
{
- _subscriptions.remove(subscriptionName);
+ _subscriptions.remove(subscriptionName);
}
Destination dest = consumer.getDestination();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=488249&r1=488248&r2=488249
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Dec 18 04:12:57 2006
@@ -36,19 +36,16 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Iterator;
public class BasicMessageConsumer extends Closeable implements MessageConsumer
{
- private static final Logger _logger =
Logger.getLogger(BasicMessageConsumer.class);
+ private static final Logger _logger =
Logger.getLogger(BasicMessageConsumer.class);
/**
* The connection being used by this consumer
@@ -296,7 +293,7 @@
public Message receive(long l) throws JMSException
{
checkPreConditions();
-
+
acquireReceiving();
try
@@ -316,7 +313,7 @@
preApplicationProcessing(m);
postDeliver(m);
}
-
+
return m;
}
catch (InterruptedException e)
@@ -589,5 +586,13 @@
{
throw new IllegalStateException("Consumer is closed");
}
+ }
+
+ /**
+ * Called on recovery to reset the list of delivery tags
+ */
+ public void clearUnackedMessages()
+ {
+ _unacknowledgedDeliveryTags.clear();
}
}