Author: ritchiem
Date: Tue Feb 13 03:46:37 2007
New Revision: 506979
URL: http://svn.apache.org/viewvc?view=rev&rev=506979
Log:
QPID-346 Message loss after rollback/recover
With Multiple consumers closing and requeuing occasionally a message would be
lost given the use of msg.getDeliveredToConsumer() as this will be set on the
first send an so on the infrequent occasion that a subscriber closes whilst a
message is being delivered then that message would be lost.
AMQChannel - Fixed bug where messages would not be requeued on consumer
closure. Increased quantity of logging.
AMQMessage - Added method to get 'taken' status.
AMQQueue - Wrapped all log messages with correct is<X>Enabled() also removed
debug() method as this makes debugging very difficult (log4j will always report
the same log line, requiring searching of the file to fine the actual log line.)
ConcurrentSelectorDeliveryManager - Increased and enclosed logging
(isXEnabled). Wrapped the send calls with a lock on the Subscription(Impl) such
that a send will not occur if the Subscription(Impl) has been closed.
SubscriptionImpl - Used sendLock to set SI closed. This is used to mark
subscription as suspended so no messages will be sent to it. Increased and
wrapped logging.
SubscriptionSet - Added locking around the insertion and removal of entries to
_subscription. As we need to retrieve the actual Subscription from the map when
removing rather than the dummy object created for lookup. This requires two
call to _subcription which is there for not thread safe.
log4j - updated to have handy debug defaults that simply need uncommented.
Test to follow, once cleaned up.
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml
(original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/etc/log4j.xml Tue Feb
13 03:46:37 2007
@@ -47,14 +47,25 @@
</category>
<category name="org.apache.qpid.framing.AMQDataBlockEncoder">
- <priority value="info"/>
- </category>
+ <priority value="info"/>
+ </category>
+
+ <!--category name="org.apache.qpid.server.queue.SubscriptionImpl">
+ <priority value="trace"/>
+ </category>
+ <category
name="org.apache.qpid.server.queue.ConcurrentSelectorDeliveryManager">
+ <priority value="trace"/>
+ </category>
+
+ <category name="org.apache.qpid.server.AMQChannel">
+ <priority value="trace"/>
+ </category -->
- <category name="org.apache.qpid">
+ <category name="org.apache.qpid">
<priority value="warn"/>
</category>
-
+
<root>
<priority value="info"/>
<appender-ref ref="STDOUT"/>
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Tue Feb 13 03:46:37 2007
@@ -310,13 +310,15 @@
{
if (_log.isTraceEnabled())
{
- _log.trace("Unsubscribed consumer:" + consumerTag);
+ _log.trace("Unsubscribed consumer:" + consumerTag + "on Session "
+ session +
+ " Unacked Map Size:" +
_unacknowledgedMessageMap.size());
}
AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
{
q.unregisterProtocolSession(session, _channelId, consumerTag);
}
+ requeue();
}
/**
@@ -358,15 +360,18 @@
*/
public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag,
String consumerTag, AMQQueue queue)
{
- if (_log.isTraceEnabled())
- {
- _log.trace("Adding unackedMessage (" +
System.identityHashCode(message) + ") for channel " + _channelId +
- " with delivery tag " + deliveryTag + " and consumerTag
" + consumerTag +
- " from queue:" + queue.getName());
- }
-
synchronized (_unacknowledgedMessageMapLock)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("Adding unackedMessage (" +
System.identityHashCode(message) + ") for channel " + _channelId +
+ "(" + System.identityHashCode(this) + ")" +
+ " with delivery tag " + deliveryTag + " and
consumerTag " + consumerTag +
+ " from queue:" + queue.getName() +
+ " unackedSize[" +
System.identityHashCode(_unacknowledgedMessageMap) + "](pre-put):"
+ + _unacknowledgedMessageMap.size() + ":" +
_unacknowledgedMessageMap.toString());
+ }
+
_unacknowledgedMessageMap.put(deliveryTag, new
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
_lastDeliveryTag = deliveryTag;
checkSuspension();
@@ -404,6 +409,11 @@
unacked.queue.deliver(unacked.message);
}
+ }
+
+ if (_unacknowledgedMessageMap.size() != 0)
+ {
+ _log.error("unack map is not empty after resend was item added to
unack map whilst consumer is closing");
}
}
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Tue Feb 13 03:46:37 2007
@@ -471,4 +471,9 @@
{
return _takenBySubcription;
}
+
+ public boolean isTaken()
+ {
+ return _taken.get();
+ }
}
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Tue Feb 13 03:46:37 2007
@@ -341,7 +341,12 @@
public void registerProtocolSession(AMQProtocolSession ps, int channel,
String consumerTag, boolean acks, FieldTable filters, boolean noLocal)
throws AMQException
{
- debug("Registering protocol session {0} with channel {1} and consumer
tag {2} with {3}", ps, channel, consumerTag, this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Registering protocol session
{0} with channel {1} [{4}] and " +
+ "consumer tag {2} with {3}",
+ ps, channel, consumerTag, this,
System.identityHashCode(channel)));
+ }
Subscription subscription =
_subscriptionFactory.createSubscription(channel, ps, consumerTag, acks,
filters, noLocal, this);
@@ -358,8 +363,13 @@
public void unregisterProtocolSession(AMQProtocolSession ps, int channel,
String consumerTag) throws AMQException
{
- debug("Unregistering protocol session {0} with channel {1} and
consumer tag {2} from {3}", ps, channel, consumerTag,
- this);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Unregistering protocol session
{0} with channel {1} [{4}] " +
+ "and consumer tag {2} from {3}",
+ ps, channel, consumerTag, this,
System.identityHashCode(channel)));
+ }
Subscription removedSubscription;
if ((removedSubscription =
_subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel,
@@ -371,6 +381,12 @@
" and protocol session key " + ps.getKey()
+ " not registered with queue " + this);
}
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("Removed consumer tag {0} with
channel {1} [{3}] from {2}",
+ consumerTag, channel, this,
System.identityHashCode(channel)));
+ }
+
removedSubscription.close();
// if we are eligible for auto deletion, unregister from the queue
registry
@@ -412,7 +428,10 @@
protected void autodelete() throws AMQException
{
- debug("autodeleting {0}", this);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug(MessageFormat.format("autodeleting {0}", this));
+ }
delete();
}
@@ -514,14 +533,6 @@
public String toString()
{
return "Queue(" + _name + ")@" + System.identityHashCode(this);
- }
-
- private void debug(String msg, Object... args)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug(MessageFormat.format(msg, args));
- }
}
public long getMinimumAlertRepeatGap()
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Tue Feb 13 03:46:37 2007
@@ -254,11 +254,17 @@
// message will be null if we have no messages in the messageQueue.
if (message == null)
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace("No messages for Subscriber(" +
System.identityHashCode(sub) + ") from queue; (" +
System.identityHashCode(messageQueue) + ")");
+ }
return;
}
if (_log.isDebugEnabled())
{
- _log.debug("Async Delivery Message (" +
System.identityHashCode(message) + ") to :" + System.identityHashCode(this));
+ _log.debug("Async Delivery Message (" +
System.identityHashCode(message) +
+ ") by :" + System.identityHashCode(this) +
+ ") to :" + System.identityHashCode(sub));
}
sub.send(message, queue);
@@ -333,17 +339,28 @@
public void deliver(String name, AMQMessage msg) throws
FailedDequeueException
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(id() + "deliver :" + System.identityHashCode(msg));
+ _log.debug(id() + " Deliver :" + System.identityHashCode(msg) +
")");
}
//Check if we have someone to deliver the message to.
_lock.lock();
try
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace(id() + " Getting next Subscriber for message :" +
System.identityHashCode(msg) + ")");
+ }
+
Subscription s = _subscriptions.nextSubscriber(msg);
+ if (_log.isTraceEnabled())
+ {
+ _log.trace(id() + " Subscriber (" + System.identityHashCode(s)
+ ")" +
+ " selected for message :" +
System.identityHashCode(msg) + ")");
+ }
+
if (s == null) //no-one can take the message right now.
{
if (_log.isDebugEnabled())
@@ -392,18 +409,58 @@
}
else
{
+ if (_log.isTraceEnabled())
+ {
+ _log.trace(id() + " About to take sendLock for subscriber
:" + System.identityHashCode(s) +
+ " to deliver message:" +
System.identityHashCode(msg));
+ }
+
//release lock now
_lock.unlock();
- if (_log.isDebugEnabled())
+ synchronized (s.sendlock())
{
- _log.debug(id() + "Delivering Message:" +
System.identityHashCode(msg) + " to(" +
- System.identityHashCode(s) + ") :" + s);
+ if (!s.isSuspended())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Delivering Message:" +
System.identityHashCode(msg) + " to(" +
+ System.identityHashCode(s) + ") :" + s);
+ }
+
+ //Mark message as taken
+ msg.taken(s);
+ //Deliver the message
+ s.send(msg, _queue);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Subscription(" +
System.identityHashCode(s) + ") became suspended between nextSubscriber and
send");
+ }
+ }
+ }
+
+ if (!msg.isTaken())
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" +
System.identityHashCode(msg) + ") has not been taken so recursing!:" +
+ " Subscriber:" +
System.identityHashCode(s));
+ }
+
+ deliver(name, msg);
+ }
+ else
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + " Message(" +
System.identityHashCode(msg) +
+ ") has been taken so disregarding deliver
request to Subscriber:" +
+ System.identityHashCode(s));
+ }
}
- //Mark message as taken
- msg.taken(s);
- //Deliver the message
- s.send(msg, _queue);
}
}
finally
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Tue Feb 13 03:46:37 2007
@@ -69,7 +69,7 @@
private boolean _closed = false;
private AMQQueue _queue;
- private final AtomicBoolean _resending = new AtomicBoolean(false);
+ private final AtomicBoolean _sendLock = new AtomicBoolean(false);
public static class Factory implements SubscriptionFactory
{
@@ -193,7 +193,18 @@
public String toString()
{
- return "[channel=" + channel + ", consumerTag=" + consumerTag + ",
session=" + protocolSession.getKey() + "]";
+ String subscriber = "[channel=" + channel +
+ ", consumerTag=" + consumerTag +
+ ", session=" + protocolSession.getKey() +
+ ", resendQueue=" + (_resendQueue != null);
+
+ if (_resendQueue != null)
+ {
+ subscriber += ", resendSize=" + _resendQueue.size();
+ }
+
+
+ return subscriber + "]";
}
/**
@@ -239,7 +250,7 @@
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag,
consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag,
msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag,
msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered());
AMQDataBlock frame = msg.getDataBlock(deliver,
channel.getChannelId());
//fixme what is wrong with this?
@@ -275,7 +286,7 @@
channel.addUnacknowledgedMessage(msg, deliveryTag,
consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag,
msg.getRoutingKey(), msg.getExchangeName(),msg.isRedelivered());
+ ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag,
msg.getRoutingKey(), msg.getExchangeName(), msg.isRedelivered());
AMQDataBlock frame = msg.getDataBlock(deliver,
channel.getChannelId());
//fixme what is wrong with this?
@@ -292,7 +303,18 @@
public boolean isSuspended()
{
- return channel.isSuspended() && !_resending.get();
+ if (_logger.isTraceEnabled())
+ {
+ if (channel.isSuspended())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this)
+ ") channel's is susupended");
+ }
+ if (_sendLock.get())
+ {
+ _logger.trace("Subscription(" + System.identityHashCode(this)
+ ") has sendLock set so closing.");
+ }
+ }
+ return channel.isSuspended() || _sendLock.get();
}
/**
@@ -386,7 +408,20 @@
public void close()
{
- _logger.info("Closing subscription:" + this);
+ synchronized (_sendLock)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting SendLock true");
+ }
+
+ _sendLock.set(true);
+
+ }
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing subscription (" +
System.identityHashCode(this) + "):" + this);
+ }
if (_resendQueue != null && !_resendQueue.isEmpty())
{
@@ -411,17 +446,17 @@
));
_closed = true;
}
+
}
private void requeue()
{
-
if (_queue != null)
{
- _logger.trace("Requeuing :" + _resendQueue.size() + " messages");
-
- //Take control over to this thread for delivering messages from
the Async Delivery.
- setResending(true);
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Requeuing :" + _resendQueue.size() + "
messages");
+ }
while (!_resendQueue.isEmpty())
{
@@ -441,8 +476,6 @@
}
}
- setResending(false);
-
if (!_resendQueue.isEmpty())
{
_logger.error("[MESSAGES LOST]Unable to re-deliver messages as
queue is null.");
@@ -462,14 +495,6 @@
_resendQueue = null;
}
- private void setResending(boolean resending)
- {
- synchronized (_resending)
- {
- _resending.set(resending);
- }
- }
-
public boolean isBrowser()
{
return _isBrowser;
@@ -528,7 +553,7 @@
public Object sendlock()
{
- return _resending;
+ return _sendLock;
}
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String
routingKey, String exchange, boolean redelivered)
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=506979&r1=506978&r2=506979
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Tue Feb 13 03:46:37 2007
@@ -32,11 +32,12 @@
{
private static final Logger _log = Logger.getLogger(SubscriptionSet.class);
- /** List of registered subscribers */
+ /** List of registered subscribers all edits must be done whilst holidng
_subscriptionsChange */
private List<Subscription> _subscriptions = new
CopyOnWriteArrayList<Subscription>();
/** Used to control the round robin delivery of content */
private int _currentSubscriber;
+ private final Object _subscriptionsChange = new Object();
/** Accessor for unit tests. */
int getCurrentSubscriber()
@@ -46,7 +47,10 @@
public void addSubscriber(Subscription subscription)
{
- _subscriptions.add(subscription);
+ synchronized (_subscriptionsChange)
+ {
+ _subscriptions.add(subscription);
+ }
}
/**
@@ -59,13 +63,27 @@
public Subscription removeSubscriber(Subscription subscription)
{
// TODO: possibly need O(1) operation here.
- int subIndex = _subscriptions.indexOf(subscription);
- if (subIndex != -1)
+ Subscription sub = null;
+ synchronized (_subscriptionsChange)
+ {
+ int subIndex = _subscriptions.indexOf(subscription);
+
+ if (subIndex != -1)
+ {
+ //we can't just return the passed in subscription as it is a
new object
+ // and doesn't contain the stored state we need.
+ //NOTE while this may be removed now anyone with an iterator
will still have it in the list!!
+ sub = _subscriptions.remove(subIndex);
+ }
+ else
+ {
+ _log.error("Unable to remove from index(" + subIndex +
")subscription:" + subscription);
+ }
+ }
+ if (sub != null)
{
- //we can't just return the passed in subscription as it is a new
object
- // and doesn't contain the stored state we need.
- return _subscriptions.remove(subIndex);
+ return sub;
}
else
{