Author: ritchiem
Date: Wed Oct 17 09:39:20 2007
New Revision: 585565

URL: http://svn.apache.org/viewvc?rev=585565&view=rev
Log:
QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages that 
have been taken.

Modified:
    
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585565&r1=585564&r2=585565&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Wed Oct 17 09:39:20 2007
@@ -883,7 +883,36 @@
                             _log.trace(debugIdentity() + "Delivering Message:" 
+ msg.debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
-                        msg.taken(_queue, s);
+
+                        if (msg.taken(_queue, s))
+                        {
+                            //Message has been delivered so don't redeliver.
+                            // This can currently occur because of the 
recursive call below
+                            // During unit tests the send can occur
+                            // client then rejects
+                            // this reject then releases the message by the 
time the
+                            // if(!msg.isTaken()) call is made below
+                            // the message has been released so that thread 
loops to send the message again
+                            // of course by the time it gets back to here. the 
thread that released the
+                            // message is now ready to send it. Here is a 
sample trace for reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, 
session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, 
session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 
Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 
msg:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message 
by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, 
consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 
ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738
 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, 
session=anonymous(5050419), resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: 
Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: 
Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 
Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 
msg:(HC:5529738 ID:145 Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 
Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]}
+                            // Note: In the last request to take the message 
from thread 4,5 the message has been
+                            // taken by the previous call done by thread 2,5
+
+
+                            return;
+                        }
                         //Deliver the message
                         s.send(msg, _queue);
                     }
@@ -897,6 +926,10 @@
                     }
                 }
 
+                //
+                // Why do we do this? What was the reasoning? We should have a 
better approach
+                // than recursion and rejecting if someone else sends it 
before we do.
+                //
                 if (!msg.isTaken(_queue))
                 {
                     if (debugEnabled)


Reply via email to