Author: ritchiem
Date: Wed Feb 14 00:21:37 2007
New Revision: 507433
URL: http://svn.apache.org/viewvc?view=rev&rev=507433
Log:
QPID-346 Message loss after rollback/recover
Messages were still occasionally being sent twice.
AMQChannel - added trace level logging that will show an error if the same
message is attempted to be sent to the same client.
AMQMessage - Remove logic that says the same subscriber can take always 'take'
the message.
SubscriptionImpl - Release message when it is put back on to the resendQueue
this will allow it to be re-'taken'
AMQSession - Added method to Dispatcher to clean up incomming _queue to try and
prevent messages arriving for closed consumers.
BasicMessageConsumer - added comments
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
Wed Feb 14 00:21:37 2007
@@ -372,6 +372,20 @@
+ _unacknowledgedMessageMap.size() + ":" +
_unacknowledgedMessageMap.toString());
}
+ //Debug adding messages to this map.
+ if (_log.isTraceEnabled())
+ {
+ for (Map.Entry<Long, UnacknowledgedMessage> entry :
_unacknowledgedMessageMap.entrySet())
+ {
+ if (entry.getValue().message == message)
+ {
+ // this is set at error level but only output it if we
are tracing.
+ _log.error("Adding message (" +
System.identityHashCode(message) +
+ ") that is already in unacked map entryTag:"
+ + entry.getKey() + " dT:" + deliveryTag);
+ }
+ }
+ }
_unacknowledgedMessageMap.put(deliveryTag, new
UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
_lastDeliveryTag = deliveryTag;
checkSuspension();
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Wed Feb 14 00:21:37 2007
@@ -448,10 +448,6 @@
{
if (_taken.getAndSet(true))
{
- if (sub == _takenBySubcription)
- {
- return false;
- }
return true;
}
else
Modified:
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Wed Feb 14 00:21:37 2007
@@ -537,6 +537,10 @@
public void addToResendQueue(AMQMessage msg)
{
+ //fixme - will this be ok as we need to ensure redelivery to same
subscriber first
+ //release the message so it can be redelivered
+ msg.release();
+
// add to our resend queue
getResendQueue().add(msg);
Modified:
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Wed Feb 14 00:21:37 2007
@@ -47,6 +47,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
+import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -289,6 +290,61 @@
}
}
+
+ /**
+ * The dispatcher should be stopped when calling this.
+ *
+ * @param consumerTag
+ */
+ public void removePending(String consumerTag)
+ {
+
+ synchronized (_lock)
+ {
+ boolean stopped = connectionStopped();
+
+ _dispatcher.setConnectionStopped(false);
+
+ LinkedList<UnprocessedMessage> tmpList = new
LinkedList<UnprocessedMessage>();
+
+ while (_queue.size() != 0)
+ {
+ UnprocessedMessage message = null;
+ try
+ {
+ message = (UnprocessedMessage) _queue.take();
+
+ if
(!message.deliverBody.consumerTag.equals(consumerTag))
+ {
+ tmpList.add(message);
+ }
+ else
+ {
+ _logger.error("Pruned pending message for
consumer:" + consumerTag);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ _logger.error("Interrupted whilst taking message");
+ }
+ }
+
+ if (!tmpList.isEmpty())
+ {
+ _logger.error("Tmp list is not empty");
+ }
+
+ for (UnprocessedMessage msg : tmpList)
+ {
+ _queue.add(msg);
+ }
+
+ if (stopped)
+ {
+ _dispatcher.setConnectionStopped(stopped);
+ }
+ }
+ }
}
AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
@@ -599,8 +655,6 @@
//Ensure we only try and close an open session.
if (!_closed.getAndSet(true))
{
- // we pass null since this is not an error case
- closeProducersAndConsumers(null);
try
{
@@ -618,6 +672,9 @@
// When control resumes at this point, a reply will have
been received that
// indicates the broker has closed the channel successfully
+ // we pass null since this is not an error case
+ closeProducersAndConsumers(null);
+
}
catch (AMQException e)
{
@@ -1784,7 +1841,12 @@
*/
void deregisterConsumer(BasicMessageConsumer consumer)
{
+ //need to clear pending messages from session _queue that the
dispatcher will handle
+ // or we will get
+ // _dispatcher.removePending(consumer.getConsumerTag());
+
_consumers.remove(consumer.getConsumerTag());
+
String subscriptionName = _reverseSubscriptionMap.remove(consumer);
if (subscriptionName != null)
{
Modified:
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=507433&r1=507432&r2=507433
==============================================================================
---
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Wed Feb 14 00:21:37 2007
@@ -481,8 +481,13 @@
}
}
+
+ //this will remove consumer from _consumers map
deregisterConsumer();
+
+ // clears unacks from this consumer
_unacknowledgedDeliveryTags.clear();
+
if (_messageListener != null && _receiving.get())
{
_logger.info("Interrupting thread: " + _receivingThread);