Author: aidan
Date: Thu Aug  7 04:15:01 2008
New Revision: 683583

URL: http://svn.apache.org/viewvc?rev=683583&view=rev
Log:
QPID-1218: Boost broker performance by lots.

AMQMessage: Allow references to be incremented in a pile

IncomingMessage: Increment message references in one go, flatten delivery loop 
a little.
Make _destinationQueues an ArrayList, massively increasing performance. Iter
ate through it with indexing

AccessResult: don't use StringBuilder so much

Update tests and exchanges to reflect new API usage, almost all of this is just 
type narrowing except for Topic where there's an extra copy, but it isn't too 
bad relative to the number of HashSet and HashMap operations that go on inside 
there.

Modified:
    
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java

Modified: 
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
 Thu Aug  7 04:15:01 2008
@@ -204,7 +204,7 @@
         
((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
         AMQQueue q = getQueueRegistry().getQueue(new 
AMQShortString("diagnosticqueue"));
 
-        Collection<AMQQueue> queues =  new ArrayList<AMQQueue>();
+        ArrayList<AMQQueue> queues =  new ArrayList<AMQQueue>();
         queues.add(q);
         payload.enqueue(queues);
         

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
 Thu Aug  7 04:15:01 2008
@@ -191,7 +191,7 @@
 
         final AMQShortString routingKey = payload.getRoutingKey() == null ? 
AMQShortString.EMPTY_STRING : payload.getRoutingKey();
 
-        final List<AMQQueue> queues = (routingKey == null) ? null : 
_index.get(routingKey);
+        final ArrayList<AMQQueue> queues = (routingKey == null) ? null : 
_index.get(routingKey);
 
         if (_logger.isDebugEnabled())
         {

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
 Thu Aug  7 04:15:01 2008
@@ -249,7 +249,7 @@
             _logger.debug("Exchange " + getName() + ": routing message with 
headers " + headers);
         }
         boolean routed = false;
-        Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
         for (Registration e : _bindings)
         {
 

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
 Thu Aug  7 04:15:01 2008
@@ -37,12 +37,12 @@
  */
 class Index
 {
-    private ConcurrentMap<AMQShortString, List<AMQQueue>> _index
-            = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+    private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index
+            = new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>();
 
     synchronized boolean add(AMQShortString key, AMQQueue queue)
     {
-        List<AMQQueue> queues = _index.get(key);
+        ArrayList<AMQQueue> queues = _index.get(key);
         if(queues == null)
         {
             queues = new ArrayList<AMQQueue>();
@@ -66,7 +66,7 @@
 
     synchronized boolean remove(AMQShortString key, AMQQueue queue)
     {
-        List<AMQQueue> queues = _index.get(key);
+        ArrayList<AMQQueue> queues = _index.get(key);
         if (queues != null)
         {
             queues = new ArrayList<AMQQueue>(queues);
@@ -87,7 +87,7 @@
         return false;
     }
 
-    List<AMQQueue> get(AMQShortString key)
+    ArrayList<AMQQueue> get(AMQShortString key)
     {
         return _index.get(key);
     }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
 Thu Aug  7 04:15:01 2008
@@ -32,7 +32,6 @@
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.queue.IncomingMessage;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.exchange.topic.TopicParser;
 import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
@@ -48,9 +47,6 @@
 import javax.management.openmbean.TabularDataSupport;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.lang.ref.WeakReference;
 
 public class TopicExchange extends AbstractExchange
@@ -532,7 +528,10 @@
 
         final AMQShortString routingKey = payload.getRoutingKey();
 
-        Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey);
+        // The copy here is unfortunate, but not too bad relevant to the 
amount of
+        // things created and copied in getMatchedQueues
+        ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+        queues.addAll(getMatchedQueues(payload, routingKey));
 
         if(queues == null || queues.isEmpty())
         {

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
 Thu Aug  7 04:15:01 2008
@@ -291,12 +291,17 @@
         return this;
     }
 
-    /** Threadsafe. Increment the reference count on the message. */
     public boolean incrementReference()
     {
-        if(_referenceCount.incrementAndGet() <= 1)
+        return incrementReference(1);
+    }
+
+    /* Threadsafe. Increment the reference count on the message. */
+    public boolean incrementReference(int count)
+    {
+        if(_referenceCount.addAndGet(count) <= 1)
         {
-            _referenceCount.decrementAndGet();
+            _referenceCount.addAndGet(-count);
             return false;
         }
         else

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
 Thu Aug  7 04:15:01 2008
@@ -34,6 +34,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 public class IncomingMessage implements Filterable<RuntimeException>
@@ -63,7 +64,7 @@
      * delivered. It is <b>cleared after delivery has been attempted</b>. Any 
persistent record of destinations is done
      * by the message handle.
      */
-    private Collection<AMQQueue> _destinationQueues;
+    private ArrayList<AMQQueue> _destinationQueues;
 
     private AMQProtocolSession _publisher;
     private MessageStore _messageStore;
@@ -134,21 +135,13 @@
 
             if(_destinationQueues != null)
             {
-                for (AMQQueue q : _destinationQueues)
+                for (int i = 0; i < _destinationQueues.size(); i++)
                 {
-                    if(q.isDurable())
-                    {
-
-                        
_messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId);
-                    }
+                    _messageStore.enqueueMessage(_txnContext.getStoreContext(),
+                            _destinationQueues.get(i), _messageId);
                 }
             }
-
         }
-
-
-
-
     }
 
     public AMQMessage deliverToQueues()
@@ -157,10 +150,9 @@
 
         // we get a reference to the destination queues now so that we can 
clear the
         // transient message data as quickly as possible
-        Collection<AMQQueue> destinationQueues = _destinationQueues;
         if (_logger.isDebugEnabled())
         {
-            _logger.debug("Delivering message " + _messageId + " to " + 
destinationQueues);
+            _logger.debug("Delivering message " + _messageId + " to " + 
_destinationQueues);
         }
 
         AMQMessage message = null;
@@ -178,10 +170,7 @@
             message.setExpiration(_expiration);
             message.setClientIdentifier(_publisher.getSessionIdentifier());
 
-
-
-
-            if ((destinationQueues == null) || destinationQueues.isEmpty())
+            if ((_destinationQueues == null) || _destinationQueues.size() == 0)
             {
 
                 if (isMandatory() || isImmediate())
@@ -196,10 +185,9 @@
             }
             else
             {
-                // TODO
-
                 int offset;
-                final int queueCount = destinationQueues.size();
+                final int queueCount = _destinationQueues.size();
+                message.incrementReference(queueCount);
                 if(queueCount == 1)
                 {
                     offset = 0;
@@ -212,33 +200,16 @@
                         offset = -offset;
                     }
                 }
-
-                int i = 0;
-                for (AMQQueue q : destinationQueues)
+                for (int i = offset; i < queueCount; i++)
                 {
-                    if(++i > offset)
-                    {
-                        // Increment the references to this message for each 
queue delivery.
-                        message.incrementReference();
-                        // normal deliver so add this message at the end.
-                        _txnContext.deliver(q, message);
-                    }
+                    // normal deliver so add this message at the end.
+                    _txnContext.deliver(_destinationQueues.get(i), message);
                 }
-                i = 0;
-                if(offset != 0)
+                for (int i = 0; i < offset; i++)
                 {
-                    for (AMQQueue q : destinationQueues)
-                    {
-                        if(i++ < offset)
-                        {
-                            // Increment the references to this message for 
each queue delivery.
-                            message.incrementReference();
-                            // normal deliver so add this message at the end.
-                            _txnContext.deliver(q, message);
-                        }
-                    }
+                    // normal deliver so add this message at the end.
+                    _txnContext.deliver(_destinationQueues.get(i), message);
                 }
-
             }
 
             // we then allow the transactional context to do something with 
the message content
@@ -329,7 +300,7 @@
         _exchange.route(this);
     }
 
-    public void enqueue(final Collection<AMQQueue> queues)
+    public void enqueue(final ArrayList<AMQQueue> queues)
     {
         _destinationQueues = queues;
     }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
 Thu Aug  7 04:15:01 2008
@@ -27,23 +27,23 @@
         GRANTED, REFUSED
     }
 
-    StringBuilder _authorizer;
-    AccessStatus _status;
+    private String _authorizer;
+    private AccessStatus _status;
 
     public AccessResult(ACLPlugin authorizer, AccessStatus status)
     {
         _status = status;
-        _authorizer = new StringBuilder(authorizer.getPluginName());
+        _authorizer = authorizer.getPluginName();
     }
 
     public void setAuthorizer(ACLPlugin authorizer)
     {
-        _authorizer.append(authorizer.getPluginName());
+        _authorizer += authorizer.getPluginName();
     }
 
     public String getAuthorizer()
     {
-        return _authorizer.toString();
+        return _authorizer;
     }
 
     public void setStatus(AccessStatus status)
@@ -58,8 +58,7 @@
 
     public void addAuthorizer(ACLPlugin accessManager)
     {
-        _authorizer.insert(0, "->");
-        _authorizer.insert(0, accessManager.getPluginName());
+        _authorizer = accessManager.getPluginName() + "->" + _authorizer;
     }
 
 

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Thu Aug  7 04:15:01 2008
@@ -43,6 +43,8 @@
 import org.apache.mina.common.ByteBuffer;
 
 import javax.management.Notification;
+
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Collections;
 
@@ -304,7 +306,9 @@
         for (int i = 0; i < messages.length; i++)
         {
             messages[i] = message(false, size);
-            messages[i].enqueue(Collections.singleton(_queue));
+            ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+            qs.add(_queue);
+            messages[i].enqueue(qs);
             messages[i].routingComplete(_messageStore, new 
MessageHandleFactory());
 
         }

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
 Thu Aug  7 04:15:01 2008
@@ -47,6 +47,8 @@
 import org.apache.mina.common.ByteBuffer;
 
 import javax.management.JMException;
+
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Collections;
 
@@ -216,8 +218,9 @@
         IncomingMessage msg = message(false, false);
         long id = msg.getMessageId();
         _queue.clearQueue(_storeContext);
-
-        msg.enqueue(Collections.singleton(_queue));
+        ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+        qs.add(_queue);
+        msg.enqueue(qs);
         msg.routingComplete(_messageStore, new MessageHandleFactory());
 
         msg.addContentBodyFrame(new ContentChunk()
@@ -319,7 +322,9 @@
         for (int i = 0; i < messageCount; i++)
         {
             IncomingMessage currentMessage = message(false, persistent);
-            currentMessage.enqueue(Collections.singleton(_queue));
+            ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+            qs.add(_queue);
+            currentMessage.enqueue(qs);
 
             // route header
             currentMessage.routingComplete(_messageStore, new 
MessageHandleFactory());

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
 Thu Aug  7 04:15:01 2008
@@ -40,6 +40,7 @@
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Set;
 import java.util.Collections;
@@ -146,7 +147,9 @@
             // we increment the reference here since we are not delivering the 
messaging to any queues, which is where
             // the reference is normally incremented. The test is easier to 
construct if we have direct access to the
             // subscription
-            msg.enqueue(Collections.singleton(_queue));
+            ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+            qs.add(_queue);
+            msg.enqueue(qs);
             msg.routingComplete(_messageStore, factory);
             if(msg.allContentReceived())
             {


Reply via email to