Author: rgreig
Date: Mon Dec 18 04:12:57 2006
New Revision: 488249

URL: http://svn.apache.org/viewvc?view=rev&rev=488249
Log:
QPID-209 Fix to clear unacked message list on recover()

Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488249&r1=488248&r2=488249
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Mon Dec 18 04:12:57 2006
@@ -23,23 +23,22 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.server.handler.ExchangeBoundHandler;
-import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.JMSStreamMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.protocol.AMQMethodEvent;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
+import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.handler.ExchangeBoundHandler;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 
-
 import javax.jms.*;
 import javax.jms.IllegalStateException;
 import java.io.Serializable;
@@ -50,7 +49,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 
 public class AMQSession extends Closeable implements Session, QueueSession, 
TopicSession
 {
@@ -184,7 +182,7 @@
                 }
                 else
                 {
-        
+
                     consumer.notifyMessage(message, _channelId);
 
                 }
@@ -698,7 +696,10 @@
     {
         checkNotClosed();
         checkNotTransacted(); // throws IllegalStateException if a transacted 
session
-
+        for (BasicMessageConsumer consumer : _consumers.values())
+        {
+            consumer.clearUnackedMessages();
+        }
         
_connection.getProtocolHandler().writeFrame(BasicRecoverBody.createAMQFrame(_channelId,
 false));
     }
 
@@ -1474,7 +1475,7 @@
         String subscriptionName = _reverseSubscriptionMap.remove(consumer);
         if(subscriptionName != null)
         {
-            _subscriptions.remove(subscriptionName);    
+            _subscriptions.remove(subscriptionName);
         }
 
         Destination dest = consumer.getDestination();

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=488249&r1=488248&r2=488249
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Mon Dec 18 04:12:57 2006
@@ -36,19 +36,16 @@
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageListener;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.Iterator;
 import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.Iterator;
 
 public class BasicMessageConsumer extends Closeable implements MessageConsumer
 {
-    private static final Logger _logger = 
Logger.getLogger(BasicMessageConsumer.class);  
+    private static final Logger _logger = 
Logger.getLogger(BasicMessageConsumer.class);
 
     /**
      * The connection being used by this consumer
@@ -296,7 +293,7 @@
     public Message receive(long l) throws JMSException
     {
        checkPreConditions();
-        
+
         acquireReceiving();
 
         try
@@ -316,7 +313,7 @@
                 preApplicationProcessing(m);
                 postDeliver(m);
             }
-            
+
             return m;
         }
         catch (InterruptedException e)
@@ -589,5 +586,13 @@
         {
             throw new IllegalStateException("Consumer is closed");
         }
+    }
+
+    /**
+     * Called on recovery to reset the list of delivery tags
+     */
+    public void clearUnackedMessages()
+    {
+        _unacknowledgedDeliveryTags.clear();
     }
 }


Reply via email to