Author: aidan
Date: Thu Aug 7 04:15:01 2008
New Revision: 683583
URL: http://svn.apache.org/viewvc?rev=683583&view=rev
Log:
QPID-1218: Boost broker performance by lots.
AMQMessage: Allow references to be incremented in a pile
IncomingMessage: Increment message references in one go, flatten delivery loop
a little.
Make _destinationQueues an ArrayList, massively increasing performance. Iter
ate through it with indexing
AccessResult: don't use StringBuilder so much
Update tests and exchanges to reflect new API usage, almost all of this is just
type narrowing except for Topic where there's an extra copy, but it isn't too
bad relative to the number of HashSet and HashMap operations that go on inside
there.
Modified:
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
Modified:
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
Thu Aug 7 04:15:01 2008
@@ -204,7 +204,7 @@
((BasicContentHeaderProperties)payload.getContentHeaderBody().properties).setHeaders(headers);
AMQQueue q = getQueueRegistry().getQueue(new
AMQShortString("diagnosticqueue"));
- Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
queues.add(q);
payload.enqueue(queues);
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
Thu Aug 7 04:15:01 2008
@@ -191,7 +191,7 @@
final AMQShortString routingKey = payload.getRoutingKey() == null ?
AMQShortString.EMPTY_STRING : payload.getRoutingKey();
- final List<AMQQueue> queues = (routingKey == null) ? null :
_index.get(routingKey);
+ final ArrayList<AMQQueue> queues = (routingKey == null) ? null :
_index.get(routingKey);
if (_logger.isDebugEnabled())
{
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
Thu Aug 7 04:15:01 2008
@@ -249,7 +249,7 @@
_logger.debug("Exchange " + getName() + ": routing message with
headers " + headers);
}
boolean routed = false;
- Collection<AMQQueue> queues = new ArrayList<AMQQueue>();
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
for (Registration e : _bindings)
{
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
Thu Aug 7 04:15:01 2008
@@ -37,12 +37,12 @@
*/
class Index
{
- private ConcurrentMap<AMQShortString, List<AMQQueue>> _index
- = new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
+ private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index
+ = new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>();
synchronized boolean add(AMQShortString key, AMQQueue queue)
{
- List<AMQQueue> queues = _index.get(key);
+ ArrayList<AMQQueue> queues = _index.get(key);
if(queues == null)
{
queues = new ArrayList<AMQQueue>();
@@ -66,7 +66,7 @@
synchronized boolean remove(AMQShortString key, AMQQueue queue)
{
- List<AMQQueue> queues = _index.get(key);
+ ArrayList<AMQQueue> queues = _index.get(key);
if (queues != null)
{
queues = new ArrayList<AMQQueue>(queues);
@@ -87,7 +87,7 @@
return false;
}
- List<AMQQueue> get(AMQShortString key)
+ ArrayList<AMQQueue> get(AMQShortString key)
{
return _index.get(key);
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
Thu Aug 7 04:15:01 2008
@@ -32,7 +32,6 @@
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
@@ -48,9 +47,6 @@
import javax.management.openmbean.TabularDataSupport;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicInteger;
import java.lang.ref.WeakReference;
public class TopicExchange extends AbstractExchange
@@ -532,7 +528,10 @@
final AMQShortString routingKey = payload.getRoutingKey();
- Collection<AMQQueue> queues = getMatchedQueues(payload, routingKey);
+ // The copy here is unfortunate, but not too bad relevant to the
amount of
+ // things created and copied in getMatchedQueues
+ ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
+ queues.addAll(getMatchedQueues(payload, routingKey));
if(queues == null || queues.isEmpty())
{
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Thu Aug 7 04:15:01 2008
@@ -291,12 +291,17 @@
return this;
}
- /** Threadsafe. Increment the reference count on the message. */
public boolean incrementReference()
{
- if(_referenceCount.incrementAndGet() <= 1)
+ return incrementReference(1);
+ }
+
+ /* Threadsafe. Increment the reference count on the message. */
+ public boolean incrementReference(int count)
+ {
+ if(_referenceCount.addAndGet(count) <= 1)
{
- _referenceCount.decrementAndGet();
+ _referenceCount.addAndGet(-count);
return false;
}
else
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
Thu Aug 7 04:15:01 2008
@@ -34,6 +34,7 @@
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
+import java.util.ArrayList;
import java.util.Collection;
public class IncomingMessage implements Filterable<RuntimeException>
@@ -63,7 +64,7 @@
* delivered. It is <b>cleared after delivery has been attempted</b>. Any
persistent record of destinations is done
* by the message handle.
*/
- private Collection<AMQQueue> _destinationQueues;
+ private ArrayList<AMQQueue> _destinationQueues;
private AMQProtocolSession _publisher;
private MessageStore _messageStore;
@@ -134,21 +135,13 @@
if(_destinationQueues != null)
{
- for (AMQQueue q : _destinationQueues)
+ for (int i = 0; i < _destinationQueues.size(); i++)
{
- if(q.isDurable())
- {
-
-
_messageStore.enqueueMessage(_txnContext.getStoreContext(), q, _messageId);
- }
+ _messageStore.enqueueMessage(_txnContext.getStoreContext(),
+ _destinationQueues.get(i), _messageId);
}
}
-
}
-
-
-
-
}
public AMQMessage deliverToQueues()
@@ -157,10 +150,9 @@
// we get a reference to the destination queues now so that we can
clear the
// transient message data as quickly as possible
- Collection<AMQQueue> destinationQueues = _destinationQueues;
if (_logger.isDebugEnabled())
{
- _logger.debug("Delivering message " + _messageId + " to " +
destinationQueues);
+ _logger.debug("Delivering message " + _messageId + " to " +
_destinationQueues);
}
AMQMessage message = null;
@@ -178,10 +170,7 @@
message.setExpiration(_expiration);
message.setClientIdentifier(_publisher.getSessionIdentifier());
-
-
-
- if ((destinationQueues == null) || destinationQueues.isEmpty())
+ if ((_destinationQueues == null) || _destinationQueues.size() == 0)
{
if (isMandatory() || isImmediate())
@@ -196,10 +185,9 @@
}
else
{
- // TODO
-
int offset;
- final int queueCount = destinationQueues.size();
+ final int queueCount = _destinationQueues.size();
+ message.incrementReference(queueCount);
if(queueCount == 1)
{
offset = 0;
@@ -212,33 +200,16 @@
offset = -offset;
}
}
-
- int i = 0;
- for (AMQQueue q : destinationQueues)
+ for (int i = offset; i < queueCount; i++)
{
- if(++i > offset)
- {
- // Increment the references to this message for each
queue delivery.
- message.incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q, message);
- }
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(_destinationQueues.get(i), message);
}
- i = 0;
- if(offset != 0)
+ for (int i = 0; i < offset; i++)
{
- for (AMQQueue q : destinationQueues)
- {
- if(i++ < offset)
- {
- // Increment the references to this message for
each queue delivery.
- message.incrementReference();
- // normal deliver so add this message at the end.
- _txnContext.deliver(q, message);
- }
- }
+ // normal deliver so add this message at the end.
+ _txnContext.deliver(_destinationQueues.get(i), message);
}
-
}
// we then allow the transactional context to do something with
the message content
@@ -329,7 +300,7 @@
_exchange.route(this);
}
- public void enqueue(final Collection<AMQQueue> queues)
+ public void enqueue(final ArrayList<AMQQueue> queues)
{
_destinationQueues = queues;
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java
Thu Aug 7 04:15:01 2008
@@ -27,23 +27,23 @@
GRANTED, REFUSED
}
- StringBuilder _authorizer;
- AccessStatus _status;
+ private String _authorizer;
+ private AccessStatus _status;
public AccessResult(ACLPlugin authorizer, AccessStatus status)
{
_status = status;
- _authorizer = new StringBuilder(authorizer.getPluginName());
+ _authorizer = authorizer.getPluginName();
}
public void setAuthorizer(ACLPlugin authorizer)
{
- _authorizer.append(authorizer.getPluginName());
+ _authorizer += authorizer.getPluginName();
}
public String getAuthorizer()
{
- return _authorizer.toString();
+ return _authorizer;
}
public void setStatus(AccessStatus status)
@@ -58,8 +58,7 @@
public void addAuthorizer(ACLPlugin accessManager)
{
- _authorizer.insert(0, "->");
- _authorizer.insert(0, accessManager.getPluginName());
+ _authorizer = accessManager.getPluginName() + "->" + _authorizer;
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
Thu Aug 7 04:15:01 2008
@@ -43,6 +43,8 @@
import org.apache.mina.common.ByteBuffer;
import javax.management.Notification;
+
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
@@ -304,7 +306,9 @@
for (int i = 0; i < messages.length; i++)
{
messages[i] = message(false, size);
- messages[i].enqueue(Collections.singleton(_queue));
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ messages[i].enqueue(qs);
messages[i].routingComplete(_messageStore, new
MessageHandleFactory());
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
Thu Aug 7 04:15:01 2008
@@ -47,6 +47,8 @@
import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
+
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Collections;
@@ -216,8 +218,9 @@
IncomingMessage msg = message(false, false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
-
- msg.enqueue(Collections.singleton(_queue));
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ msg.enqueue(qs);
msg.routingComplete(_messageStore, new MessageHandleFactory());
msg.addContentBodyFrame(new ContentChunk()
@@ -319,7 +322,9 @@
for (int i = 0; i < messageCount; i++)
{
IncomingMessage currentMessage = message(false, persistent);
- currentMessage.enqueue(Collections.singleton(_queue));
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ currentMessage.enqueue(qs);
// route header
currentMessage.routingComplete(_messageStore, new
MessageHandleFactory());
Modified:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?rev=683583&r1=683582&r2=683583&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
Thu Aug 7 04:15:01 2008
@@ -40,6 +40,7 @@
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Set;
import java.util.Collections;
@@ -146,7 +147,9 @@
// we increment the reference here since we are not delivering the
messaging to any queues, which is where
// the reference is normally incremented. The test is easier to
construct if we have direct access to the
// subscription
- msg.enqueue(Collections.singleton(_queue));
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue);
+ msg.enqueue(qs);
msg.routingComplete(_messageStore, factory);
if(msg.allContentReceived())
{