Author: ritchiem
Date: Wed Oct 17 09:39:20 2007
New Revision: 585565
URL: http://svn.apache.org/viewvc?rev=585565&view=rev
Log:
QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages that
have been taken.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585565&r1=585564&r2=585565&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Oct 17 09:39:20 2007
@@ -883,7 +883,36 @@
_log.trace(debugIdentity() + "Delivering Message:"
+ msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(_queue, s);
+
+ if (msg.taken(_queue, s))
+ {
+ //Message has been delivered so don't redeliver.
+ // This can currently occur because of the
recursive call below
+ // During unit tests the send can occur
+ // client then rejects
+ // this reject then releases the message by the
time the
+ // if(!msg.isTaken()) call is made below
+ // the message has been released so that thread
loops to send the message again
+ // of course by the time it gets back to here. the
thread that released the
+ // message is now ready to send it. Here is a
sample trace for reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41,
session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41,
session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145
Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214
msg:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message
by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000,
consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738
ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41,
session=anonymous(5050419), resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended:
Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!:
Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145
Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172
msg:(HC:5529738 ID:145 Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145
Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]}
+ // Note: In the last request to take the message
from thread 4,5 the message has been
+ // taken by the previous call done by thread 2,5
+
+
+ return;
+ }
//Deliver the message
s.send(msg, _queue);
}
@@ -897,6 +926,10 @@
}
}
+ //
+ // Why do we do this? What was the reasoning? We should have a
better approach
+ // than recursion and rejecting if someone else sends it
before we do.
+ //
if (!msg.isTaken(_queue))
{
if (debugEnabled)