Author: ritchiem
Date: Tue Feb 13 03:46:37 2007
New Revision: 506979

URL: http://svn.apache.org/viewvc?view=rev&rev=506979
Log:
QPID-346 Message loss after rollback/recover

With Multiple consumers closing and requeuing occasionally a message would be 
lost given the use of msg.getDeliveredToConsumer() as this will be set on the 
first send an so on the infrequent occasion that a subscriber closes whilst a 
message is being delivered then that message would be lost.

AMQChannel - Fixed bug where messages would not be requeued on consumer 
closure. Increased quantity of logging.
AMQMessage - Added method to get 'taken' status.
AMQQueue - Wrapped all log messages with correct is<X>Enabled() also removed 
debug() method as this makes debugging very difficult (log4j will always report 
the same log line, requiring searching of the file to fine the actual log line.)
ConcurrentSelectorDeliveryManager - Increased and enclosed logging 
(isXEnabled). Wrapped the send calls with a lock on the Subscription(Impl) such 
that a send will not occur if the Subscription(Impl) has been closed.

SubscriptionImpl - Used sendLock to set SI closed. This is used to mark 
subscription as suspended so no messages will be sent to it. Increased and 
wrapped logging.

SubscriptionSet - Added locking around the insertion and removal of entries to 
_subscription. As we need to retrieve the actual Subscription from the map when 
removing rather than the dummy object created for lookup. This requires two 
call to _subcription which is there for not thread safe. 

log4j - updated to have handy debug defaults that simply need uncommented.

Test to follow, once cleaned up.

Modified:
    incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml 
(original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml Tue Feb 
13 03:46:37 2007
@@ -47,14 +47,25 @@
     </category>
 
     <category name="org.apache.qpid.framing.AMQDataBlockEncoder">
-       <priority value="info"/>
-   </category>
+        <priority value="info"/>
+    </category>
+
+    <!--category name="org.apache.qpid.server.queue.SubscriptionImpl">
+        <priority value="trace"/>
+    </category>
 
+    <category 
name="org.apache.qpid.server.queue.ConcurrentSelectorDeliveryManager">
+        <priority value="trace"/>
+    </category>
+
+     <category name="org.apache.qpid.server.AMQChannel">
+        <priority value="trace"/>
+    </category -->
 
-     <category name="org.apache.qpid">
+    <category name="org.apache.qpid">
         <priority value="warn"/>
     </category>
-    
+
     <root>
         <priority value="info"/>
         <appender-ref ref="STDOUT"/>

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Tue Feb 13 03:46:37 2007
@@ -310,13 +310,15 @@
     {
         if (_log.isTraceEnabled())
         {
-            _log.trace("Unsubscribed consumer:" + consumerTag);
+            _log.trace("Unsubscribed consumer:" + consumerTag + "on Session " 
+ session +
+                       " Unacked Map Size:" + 
_unacknowledgedMessageMap.size());
         }
         AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
         if (q != null)
         {
             q.unregisterProtocolSession(session, _channelId, consumerTag);
         }
+        requeue();
     }
 
     /**
@@ -358,15 +360,18 @@
      */
     public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, 
String consumerTag, AMQQueue queue)
     {
-        if (_log.isTraceEnabled())
-        {
-            _log.trace("Adding unackedMessage (" + 
System.identityHashCode(message) + ") for channel " + _channelId +
-                       " with delivery tag " + deliveryTag + " and consumerTag 
" + consumerTag +
-                       " from queue:" + queue.getName());
-        }
-
         synchronized (_unacknowledgedMessageMapLock)
         {
+            if (_log.isTraceEnabled())
+            {
+                _log.trace("Adding unackedMessage (" + 
System.identityHashCode(message) + ") for channel " + _channelId +
+                           "(" + System.identityHashCode(this) + ")" +
+                           " with delivery tag " + deliveryTag + " and 
consumerTag " + consumerTag +
+                           " from queue:" + queue.getName() +
+                           " unackedSize[" + 
System.identityHashCode(_unacknowledgedMessageMap) + "](pre-put):"
+                           + _unacknowledgedMessageMap.size() + ":" + 
_unacknowledgedMessageMap.toString());
+            }
+
             _unacknowledgedMessageMap.put(deliveryTag, new 
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
             _lastDeliveryTag = deliveryTag;
             checkSuspension();
@@ -404,6 +409,11 @@
 
                 unacked.queue.deliver(unacked.message);
             }
+        }
+
+        if (_unacknowledgedMessageMap.size() != 0)
+        {
+            _log.error("unack map is not empty after resend was item added to 
unack map whilst consumer is closing");
         }
     }
 

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Tue Feb 13 03:46:37 2007
@@ -471,4 +471,9 @@
     {
         return _takenBySubcription;
     }
+
+    public boolean isTaken()
+    {
+        return _taken.get();
+    }
 }

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Tue Feb 13 03:46:37 2007
@@ -341,7 +341,12 @@
     public void registerProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
             throws AMQException
     {
-        debug("Registering protocol session {0} with channel {1} and consumer 
tag {2} with {3}", ps, channel, consumerTag, this);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(MessageFormat.format("Registering protocol session 
{0} with channel {1} [{4}] and " +
+                                               "consumer tag {2} with {3}",
+                                               ps, channel, consumerTag, this, 
System.identityHashCode(channel)));
+        }
 
         Subscription subscription = 
_subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, 
filters, noLocal, this);
 
@@ -358,8 +363,13 @@
 
     public void unregisterProtocolSession(AMQProtocolSession ps, int channel, 
String consumerTag) throws AMQException
     {
-        debug("Unregistering protocol session {0} with channel {1} and 
consumer tag {2} from {3}", ps, channel, consumerTag,
-              this);
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(MessageFormat.format("Unregistering protocol session 
{0} with channel {1} [{4}] " +
+                                               "and consumer tag {2} from {3}",
+                                               ps, channel, consumerTag, this, 
System.identityHashCode(channel)));
+        }
 
         Subscription removedSubscription;
         if ((removedSubscription = 
_subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
@@ -371,6 +381,12 @@
                                    " and protocol session key " + ps.getKey() 
+ " not registered with queue " + this);
         }
 
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(MessageFormat.format("Removed consumer tag {0} with 
channel {1} [{3}] from {2}",
+                                               consumerTag, channel, this, 
System.identityHashCode(channel)));
+        }
+
         removedSubscription.close();
 
         // if we are eligible for auto deletion, unregister from the queue 
registry
@@ -412,7 +428,10 @@
 
     protected void autodelete() throws AMQException
     {
-        debug("autodeleting {0}", this);
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug(MessageFormat.format("autodeleting {0}", this));
+        }
         delete();
     }
 
@@ -514,14 +533,6 @@
     public String toString()
     {
         return "Queue(" + _name + ")@" + System.identityHashCode(this);
-    }
-
-    private void debug(String msg, Object... args)
-    {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug(MessageFormat.format(msg, args));
-        }
     }
 
     public long getMinimumAlertRepeatGap()

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Tue Feb 13 03:46:37 2007
@@ -254,11 +254,17 @@
             // message will be null if we have no messages in the messageQueue.
             if (message == null)
             {
+                if (_log.isTraceEnabled())
+                {
+                    _log.trace("No messages for Subscriber(" + 
System.identityHashCode(sub) + ") from queue; (" + 
System.identityHashCode(messageQueue) + ")");
+                }
                 return;
             }
             if (_log.isDebugEnabled())
             {
-                _log.debug("Async Delivery Message (" + 
System.identityHashCode(message) + ") to :" + System.identityHashCode(this));
+                _log.debug("Async Delivery Message (" + 
System.identityHashCode(message) +
+                           ") by :" + System.identityHashCode(this) +
+                           ") to :" + System.identityHashCode(sub));
             }
 
             sub.send(message, queue);
@@ -333,17 +339,28 @@
 
     public void deliver(String name, AMQMessage msg) throws 
FailedDequeueException
     {
-        if (_log.isTraceEnabled())
+        if (_log.isDebugEnabled())
         {
-            _log.trace(id() + "deliver :" + System.identityHashCode(msg));
+            _log.debug(id() + " Deliver :" + System.identityHashCode(msg) + 
")");
         }
 
         //Check if we have someone to deliver the message to.
         _lock.lock();
         try
         {
+            if (_log.isTraceEnabled())
+            {
+                _log.trace(id() + " Getting next Subscriber for message :" + 
System.identityHashCode(msg) + ")");
+            }
+
             Subscription s = _subscriptions.nextSubscriber(msg);
 
+            if (_log.isTraceEnabled())
+            {
+                _log.trace(id() + " Subscriber (" + System.identityHashCode(s) 
+ ")" +
+                           " selected for message :" + 
System.identityHashCode(msg) + ")");
+            }
+
             if (s == null) //no-one can take the message right now.
             {
                 if (_log.isDebugEnabled())
@@ -392,18 +409,58 @@
             }
             else
             {
+                if (_log.isTraceEnabled())
+                {
+                    _log.trace(id() + " About to take sendLock for subscriber 
:" + System.identityHashCode(s) +
+                               " to deliver message:" + 
System.identityHashCode(msg));
+                }
+
                 //release lock now
                 _lock.unlock();
 
-                if (_log.isDebugEnabled())
+                synchronized (s.sendlock())
                 {
-                    _log.debug(id() + "Delivering Message:" + 
System.identityHashCode(msg) + " to(" +
-                               System.identityHashCode(s) + ") :" + s);
+                    if (!s.isSuspended())
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug(id() + "Delivering Message:" + 
System.identityHashCode(msg) + " to(" +
+                                       System.identityHashCode(s) + ") :" + s);
+                        }
+
+                        //Mark message as taken
+                        msg.taken(s);
+                        //Deliver the message
+                        s.send(msg, _queue);
+                    }
+                    else
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug(id() + " Subscription(" + 
System.identityHashCode(s) + ") became suspended between nextSubscriber and 
send");
+                        }
+                    }
+                }
+
+                if (!msg.isTaken())
+                {
+                    if (_log.isDebugEnabled())
+                    {
+                        _log.debug(id() + " Message(" + 
System.identityHashCode(msg) + ") has not been taken so recursing!:" +
+                                   " Subscriber:" + 
System.identityHashCode(s));
+                    }
+
+                    deliver(name, msg);
+                }
+                else
+                {
+                    if (_log.isDebugEnabled())
+                    {
+                        _log.debug(id() + " Message(" + 
System.identityHashCode(msg) +
+                                   ") has been taken so disregarding deliver 
request to Subscriber:" +
+                                   System.identityHashCode(s));
+                    }
                 }
-                //Mark message as taken
-                msg.taken(s);
-                //Deliver the message
-                s.send(msg, _queue);
             }
         }
         finally

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
 Tue Feb 13 03:46:37 2007
@@ -69,7 +69,7 @@
     private boolean _closed = false;
 
     private AMQQueue _queue;
-    private final AtomicBoolean _resending = new AtomicBoolean(false);
+    private final AtomicBoolean _sendLock = new AtomicBoolean(false);
 
     public static class Factory implements SubscriptionFactory
     {
@@ -193,7 +193,18 @@
 
     public String toString()
     {
-        return "[channel=" + channel + ", consumerTag=" + consumerTag + ", 
session=" + protocolSession.getKey() + "]";
+        String subscriber = "[channel=" + channel +
+                            ", consumerTag=" + consumerTag +
+                            ", session=" + protocolSession.getKey() +
+                            ", resendQueue=" + (_resendQueue != null);
+
+        if (_resendQueue != null)
+        {
+            subscriber += ", resendSize=" + _resendQueue.size();
+        }
+
+
+        return subscriber + "]";
     }
 
     /**
@@ -239,7 +250,7 @@
             {
                 channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, 
consumerTag, queue);
             }
-            ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, 
msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
+            ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, 
msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered());
             AMQDataBlock frame = msg.getDataBlock(deliver, 
channel.getChannelId());
 
             //fixme what is wrong with this?
@@ -275,7 +286,7 @@
                     channel.addUnacknowledgedMessage(msg, deliveryTag, 
consumerTag, queue);
                 }
 
-                ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, 
msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
+                ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, 
msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered());
                 AMQDataBlock frame = msg.getDataBlock(deliver, 
channel.getChannelId());
 
                 //fixme what is wrong with this?
@@ -292,7 +303,18 @@
 
     public boolean isSuspended()
     {
-        return channel.isSuspended() && !_resending.get();
+        if (_logger.isTraceEnabled())
+        {
+            if (channel.isSuspended())
+            {
+                _logger.trace("Subscription(" + System.identityHashCode(this) 
+ ") channel's is susupended");
+            }
+            if (_sendLock.get())
+            {
+                _logger.trace("Subscription(" + System.identityHashCode(this) 
+ ") has sendLock set so closing.");
+            }
+        }
+        return channel.isSuspended() || _sendLock.get();
     }
 
     /**
@@ -386,7 +408,20 @@
 
     public void close()
     {
-        _logger.info("Closing subscription:" + this);
+        synchronized (_sendLock)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Setting SendLock true");
+            }
+
+            _sendLock.set(true);
+
+        }
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Closing subscription (" + 
System.identityHashCode(this) + "):" + this);
+        }
 
         if (_resendQueue != null && !_resendQueue.isEmpty())
         {
@@ -411,17 +446,17 @@
             ));
             _closed = true;
         }
+
     }
 
     private void requeue()
     {
-
         if (_queue != null)
         {
-            _logger.trace("Requeuing :" + _resendQueue.size() + " messages");
-
-            //Take  control over to this thread for delivering messages from 
the Async Delivery.
-            setResending(true);
+            if (_logger.isTraceEnabled())
+            {
+                _logger.trace("Requeuing :" + _resendQueue.size() + " 
messages");
+            }
 
             while (!_resendQueue.isEmpty())
             {
@@ -441,8 +476,6 @@
                 }
             }
 
-            setResending(false);
-
             if (!_resendQueue.isEmpty())
             {
                 _logger.error("[MESSAGES LOST]Unable to re-deliver messages as 
queue is null.");
@@ -462,14 +495,6 @@
         _resendQueue = null;
     }
 
-    private void setResending(boolean resending)
-    {
-        synchronized (_resending)
-        {
-            _resending.set(resending);
-        }
-    }
-
     public boolean isBrowser()
     {
         return _isBrowser;
@@ -528,7 +553,7 @@
 
     public Object sendlock()
     {
-        return _resending;
+        return _sendLock;
     }
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String 
routingKey, String exchange, boolean redelivered)

Modified: 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
--- 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
 (original)
+++ 
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
 Tue Feb 13 03:46:37 2007
@@ -32,11 +32,12 @@
 {
     private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
 
-    /** List of registered subscribers */
+    /** List of registered subscribers all edits must be done whilst holidng 
_subscriptionsChange */
     private List<Subscription> _subscriptions = new 
CopyOnWriteArrayList<Subscription>();
 
     /** Used to control the round robin delivery of content */
     private int _currentSubscriber;
+    private final Object _subscriptionsChange = new Object();
 
     /** Accessor for unit tests. */
     int getCurrentSubscriber()
@@ -46,7 +47,10 @@
 
     public void addSubscriber(Subscription subscription)
     {
-        _subscriptions.add(subscription);
+        synchronized (_subscriptionsChange)
+        {
+            _subscriptions.add(subscription);
+        }
     }
 
     /**
@@ -59,13 +63,27 @@
     public Subscription removeSubscriber(Subscription subscription)
     {
         // TODO: possibly need O(1) operation here.
-        int subIndex = _subscriptions.indexOf(subscription);
 
-        if (subIndex != -1)
+        Subscription sub = null;
+        synchronized (_subscriptionsChange)
+        {
+            int subIndex = _subscriptions.indexOf(subscription);
+
+            if (subIndex != -1)
+            {
+                //we can't just return the passed in subscription as it is a 
new object
+                // and doesn't contain the stored state we need.
+                //NOTE while this may be removed now anyone with an iterator 
will still have it in the list!!
+                sub = _subscriptions.remove(subIndex);
+            }
+            else
+            {
+                _log.error("Unable to remove from index(" + subIndex + 
")subscription:" + subscription);
+            }
+        }
+        if (sub != null)
         {
-            //we can't just return the passed in subscription as it is a new 
object
-            // and doesn't contain the stored state we need.
-            return _subscriptions.remove(subIndex);
+            return sub;
         }
         else
         {


Reply via email to