Author: ritchiem
Date: Wed Feb 14 00:21:37 2007
New Revision: 507433

URL: http://svn.apache.org/viewvc?view=rev&rev=507433
Log:
QPID-346 Message loss after rollback/recover

Messages were still occasionally being sent twice.
AMQChannel - added trace level logging that will show an error if the same 
message is attempted to be sent to the same client.
AMQMessage - Remove logic that says the same subscriber can take always 'take' 
the message.
SubscriptionImpl - Release message when it is put back on to the resendQueue 
this will allow it to be re-'taken'
AMQSession - Added method to Dispatcher to clean up incomming _queue to try and 
prevent messages arriving for closed consumers.
BasicMessageConsumer - added comments

Modified:
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Wed Feb 14 00:21:37 2007
@@ -372,6 +372,20 @@
                            + _unacknowledgedMessageMap.size() + ":" + 
_unacknowledgedMessageMap.toString());
             }
 
+            //Debug adding messages to this map.
+            if (_log.isTraceEnabled())
+            {
+                for (Map.Entry<Long, UnacknowledgedMessage> entry : 
_unacknowledgedMessageMap.entrySet())
+                {
+                    if (entry.getValue().message == message)
+                    {
+                        // this is set at error level but only output it if we 
are tracing.
+                        _log.error("Adding message (" + 
System.identityHashCode(message) +
+                                   ") that is already in unacked map entryTag:"
+                                   + entry.getKey() + " dT:" + deliveryTag);
+                    }
+                }
+            }
             _unacknowledgedMessageMap.put(deliveryTag, new 
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
             _lastDeliveryTag = deliveryTag;
             checkSuspension();

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Wed Feb 14 00:21:37 2007
@@ -448,10 +448,6 @@
     {
         if (_taken.getAndSet(true))
         {
-            if (sub == _takenBySubcription)
-            {
-                return false;
-            }
             return true;
         }
         else

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 Wed Feb 14 00:21:37 2007
@@ -537,6 +537,10 @@
 
     public void addToResendQueue(AMQMessage msg)
     {
+        //fixme - will this be ok as we need to ensure redelivery to same 
subscriber first
+        //release the message so it can be redelivered
+        msg.release();
+        
         // add to our resend queue
         getResendQueue().add(msg);
 

Modified: 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Feb 14 00:21:37 2007
@@ -47,6 +47,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.LinkedList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -289,6 +290,61 @@
             }
 
         }
+
+        /**
+         * The dispatcher should be stopped when calling this.
+         *
+         * @param consumerTag
+         */
+        public void removePending(String consumerTag)
+        {
+
+            synchronized (_lock)
+            {
+                boolean stopped = connectionStopped();
+
+                _dispatcher.setConnectionStopped(false);
+
+                LinkedList<UnprocessedMessage> tmpList = new 
LinkedList<UnprocessedMessage>();
+
+                while (_queue.size() != 0)
+                {
+                    UnprocessedMessage message = null;
+                    try
+                    {
+                        message = (UnprocessedMessage) _queue.take();
+
+                        if 
(!message.deliverBody.consumerTag.equals(consumerTag))
+                        {
+                            tmpList.add(message);
+                        }
+                        else
+                        {
+                            _logger.error("Pruned pending message for 
consumer:" + consumerTag);
+                        }
+                    }
+                    catch (InterruptedException e)
+                    {
+                        _logger.error("Interrupted whilst taking message");
+                    }
+                }
+
+                if (!tmpList.isEmpty())
+                {
+                    _logger.error("Tmp list is not empty");
+                }
+
+                for (UnprocessedMessage msg : tmpList)
+                {
+                    _queue.add(msg);
+                }
+                
+                if (stopped)
+                {
+                    _dispatcher.setConnectionStopped(stopped);
+                }
+            }
+        }
     }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int 
acknowledgeMode,
@@ -599,8 +655,6 @@
             //Ensure we only try and close an open session.
             if (!_closed.getAndSet(true))
             {
-                // we pass null since this is not an error case
-                closeProducersAndConsumers(null);
 
                 try
                 {
@@ -618,6 +672,9 @@
                     // When control resumes at this point, a reply will have 
been received that
                     // indicates the broker has closed the channel successfully
 
+                    // we pass null since this is not an error case
+                    closeProducersAndConsumers(null);
+
                 }
                 catch (AMQException e)
                 {
@@ -1784,7 +1841,12 @@
      */
     void deregisterConsumer(BasicMessageConsumer consumer)
     {
+        //need to clear pending messages from session _queue that the 
dispatcher will handle
+        // or we will get 
+        //  _dispatcher.removePending(consumer.getConsumerTag());
+
         _consumers.remove(consumer.getConsumerTag());
+
         String subscriptionName = _reverseSubscriptionMap.remove(consumer);
         if (subscriptionName != null)
         {

Modified: 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
 Wed Feb 14 00:21:37 2007
@@ -481,8 +481,13 @@
                     }
                 }
 
+
+                //this will remove consumer from _consumers map
                 deregisterConsumer();
+
+                // clears unacks from this consumer
                 _unacknowledgedDeliveryTags.clear();
+
                 if (_messageListener != null && _receiving.get())
                 {
                     _logger.info("Interrupting thread: " + _receivingThread);


Reply via email to