Author: rgodfrey
Date: Wed Oct 17 12:59:58 2007
New Revision: 585655

URL: http://svn.apache.org/viewvc?rev=585655&view=rev
Log:
Merged revisions 
573738-573739,573741-574077,574079-574236,574238-574265,574267-574503,574505-574554,574556-574584,574586-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-579114,579116-579146,579148-579197,579199-579228,579230-579573,579575-579576,579579-579601,579603-579613,579615-579708,579710-580021,580023-580039,580042-580060,580062-580065,580067-580080,580082-580257,580259-580264,580266-580350,580352-580984,580986-580991,580994-581001,581003-581170,581172-581188,581190-581206,581208-581245,581247-581292,581294-581539,581541-581565,581567-581620,581622-581626,581628-581646,581648-581967,581969-582197,582199-582200,582203-582204,582206-582262,582264,582267-583084,583087,583089-583104,583106-583146,583148-583153,583155-583169,583171-583172,583174-583398,583400-583414,583416-583417,583419-583437,583439-583482,583484-583517,583519-583545,583547,583549-
 
583774,583777-583807,583809-583881,583883-584107,584109-584112,584114-584123,584125-585653
 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r585565 | ritchiem | 2007-10-17 17:39:20 +0100 (Wed, 17 Oct 2007) | 1 line
  
  QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages 
that have been taken.
........
  r585570 | ritchiem | 2007-10-17 17:48:01 +0100 (Wed, 17 Oct 2007) | 1 line
  
  Update to AMQMessage to reset the deliveredToConsumer flag(false) when the 
message is released. This flag is now used by more than the immediate delivery. 
It is also used to understand if the message has been delivered so that we can 
tell the message should not be purged.
........
  r585575 | ritchiem | 2007-10-17 17:59:42 +0100 (Wed, 17 Oct 2007) | 1 line
  
  QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async 
process if a msg is queued that has the potential to be delivered.
........
  r585642 | rgodfrey | 2007-10-17 20:42:14 +0100 (Wed, 17 Oct 2007) | 1 line
  
  QPID-645 : TxnBuffer should rethrow exceptions encountered on commit
........

Modified:
    incubator/qpid/branches/M2/   (props changed)
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
    
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java

Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Wed Oct 17 12:59:58 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-584113,584124
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-585653

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Wed Oct 17 12:59:58 2007
@@ -133,7 +133,7 @@
     public boolean isReferenced()
     {
         return _referenceCount.get() > 0;
-    }    
+    }
 
     /**
      * Used to iterate through all the body frames associated with this 
message. Will not keep all the data in memory
@@ -558,6 +558,7 @@
                 taken.set(false);
             }
 
+            _deliveredToConsumer = false;
             _takenMap.put(queue, taken);
             _takenBySubcriptionMap.put(queue, null);
         }
@@ -694,7 +695,10 @@
         return false;
     }
 
-    /** Called when this message is delivered to a consumer. (used to 
implement the 'immediate' flag functionality). */
+    /**
+     * Called when this message is delivered to a consumer. (used to implement 
the 'immediate' flag functionality).
+     * And for selector efficiency.
+     */
     public void setDeliveredToConsumer()
     {
         _deliveredToConsumer = true;

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
 Wed Oct 17 12:59:58 2007
@@ -212,6 +212,15 @@
     }
 
     /**
+     *
+     * @return the state of the async processor.
+     */
+    public boolean isProcessingAsync()
+    {
+        return _processing.get();
+    }
+
+    /**
      * Returns all the messages in the Queue
      *
      * @return List of messages
@@ -821,6 +830,12 @@
                 {
                     addMessageToQueue(msg, deliverFirst);
 
+                    //if  we have a non-filtering subscriber but queued 
messages && we're not Async && we have other Active subs then something is 
wrong!
+                    if ((s != null && hasQueuedMessages()) && 
!isProcessingAsync() && _subscriptions.hasActiveSubscribers())
+                    {
+                        _queue.deliverAsync();
+                    }
+
                     //release lock now message is on queue.
                     _lock.unlock();
 
@@ -883,7 +898,36 @@
                             _log.trace(debugIdentity() + "Delivering Message:" 
+ msg.debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
-                        msg.taken(_queue, s);
+
+                        if (msg.taken(_queue, s))
+                        {
+                            //Message has been delivered so don't redeliver.
+                            // This can currently occur because of the 
recursive call below
+                            // During unit tests the send can occur
+                            // client then rejects
+                            // this reject then releases the message by the 
time the
+                            // if(!msg.isTaken()) call is made below
+                            // the message has been released so that thread 
loops to send the message again
+                            // of course by the time it gets back to here. the 
thread that released the
+                            // message is now ready to send it. Here is a 
sample trace for reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, 
session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, 
session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145 
Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214 
msg:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message 
by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000, 
consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738 
ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738
 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41, 
session=anonymous(5050419), resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended: 
Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!: 
Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
 ID:145 Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 
Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172 
msg:(HC:5529738 ID:145 Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145 
Ref:1)]: 145; ref count: 1; taken for queues: 
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by 
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
 id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33, 
session=anonymous(26960027), resendQueue=false]}
+                            // Note: In the last request to take the message 
from thread 4,5 the message has been
+                            // taken by the previous call done by thread 2,5
+
+
+                            return;
+                        }
                         //Deliver the message
                         s.send(msg, _queue);
                     }
@@ -897,6 +941,10 @@
                     }
                 }
 
+                //
+                // Why do we do this? What was the reasoning? We should have a 
better approach
+                // than recursion and rejecting if someone else sends it 
before we do.
+                //
                 if (!msg.isTaken(_queue))
                 {
                     if (debugEnabled)
@@ -942,6 +990,8 @@
     {
         public void run()
         {
+            String startName = Thread.currentThread().getName();
+            Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
             boolean running = true;
             while (running && !_movingMessages.get())
             {
@@ -957,6 +1007,7 @@
                     _processing.set(false);
                 }
             }
+            Thread.currentThread().setName(startName);
         }
     }
 
@@ -983,8 +1034,9 @@
 
     private String currentStatus()
     {
-        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)") 
+
-               "(" + _messages.size() + ":" + 
((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
+        return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)") 
+
+               "(" + ((ConcurrentLinkedMessageQueueAtomicSize) 
_messages).headSize() +
+               ":" + (_messages.size() - 
((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
                " Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
                "(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
                " Active:" + _subscriptions.hasActiveSubscribers() +

Modified: 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
 (original)
+++ 
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
 Wed Oct 17 12:59:58 2007
@@ -54,7 +54,7 @@
         _ops.clear();
     }
 
-    private boolean prepare(StoreContext context)
+    private boolean prepare(StoreContext context) throws AMQException
     {
         for (int i = 0; i < _ops.size(); i++)
         {
@@ -63,19 +63,31 @@
             {
                 op.prepare(context);
             }
-            catch (Exception e)
+            catch (AMQException e)
             {
-                //compensate previously prepared ops
-                for (int j = 0; j < i; j++)
-                {
-                    _ops.get(j).undoPrepare();
-                }
-                return false;
+                undoPrepare(i);
+                throw e;
+            }
+            catch (RuntimeException e)
+            {
+                undoPrepare(i);
+                throw e;
             }
         }
         return true;
     }
 
+    private void undoPrepare(int lastPrepared)
+    {
+        //compensate previously prepared ops
+        for (int j = 0; j < lastPrepared; j++)
+        {
+            _ops.get(j).undoPrepare();
+        }
+    }
+
+       
+       
     public void rollback(StoreContext context) throws AMQException
     {
         for (TxnOp op : _ops)

Modified: 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
 (original)
+++ 
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
 Wed Oct 17 12:59:58 2007
@@ -27,6 +27,7 @@
 import org.apache.qpid.server.store.StoreContext;
 
 import java.util.LinkedList;
+import java.util.NoSuchElementException;
 
 public class TxnBufferTest extends TestCase
 {
@@ -78,7 +79,16 @@
         buffer.enlist(new FailedPrepare());
         buffer.enlist(new MockOp());
 
-        buffer.commit(null);
+               try
+        {
+            buffer.commit(null);
+            
+        }
+        catch (NoSuchElementException e)
+        {
+
+        }
+
         validateOps();
         store.validate();
     }


Reply via email to