Author: ritchiem
Date: Tue Jul 17 04:38:10 2007
New Revision: 556890
URL: http://svn.apache.org/viewvc?view=rev&rev=556890
Log:
QPID-541 A large portion of memory was being wasted in 32k ByteBuffers being
held by the AMQShortStrings.
Patch submitted by Robert Godfrey to intern() the AMQSSs to reduce memory
usage. Current implementation *will* impact performance due to the usage of a
static Map for storage. However, a thread local implementation is in the works.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
Tue Jul 17 04:38:10 2007
@@ -98,6 +98,12 @@
}
else
{
+
+ if (body.consumerTag != null)
+ {
+ body.consumerTag = body.consumerTag.intern();
+ }
+
try
{
AMQShortString consumerTag =
channel.subscribeToQueue(body.consumerTag, queue, session, !body.noAck,
@@ -136,15 +142,15 @@
// If the above doesn't work then perhaps this is wrong
too.
// throw
body.getConnectionException(AMQConstant.NOT_ALLOWED,
// "Non-unique consumer
tag, '" + body.consumerTag + "'");
- // AMQP version change: Hardwire the
version to 0-8 (major=8, minor=0)
+ // AMQP version change: Hardwire the version to 0-8
(major=8, minor=0)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as
versions change.
session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
- (byte)8, (byte)0, // AMQP version (major, minor)
- BasicConsumeBody.getClazz((byte)8, (byte)0), //
classId
- BasicConsumeBody.getMethod((byte)8, (byte)0), //
methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- msg)); // replyText
+
(byte) 8, (byte) 0, // AMQP version (major, minor)
+
BasicConsumeBody.getClazz((byte) 8, (byte) 0), // classId
+
BasicConsumeBody.getMethod((byte) 8, (byte) 0), // methodId
+
AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+
msg)); // replyText
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
Tue Jul 17 04:38:10 2007
@@ -67,6 +67,10 @@
body.exchange = ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
}
+ else
+ {
+ body.exchange = body.exchange.intern();
+ }
VirtualHost vHost = session.getVirtualHost();
Exchange e = vHost.getExchangeRegistry().getExchange(body.exchange);
// if the exchange does not exist we raise a channel exception
@@ -86,10 +90,16 @@
throw body.getChannelNotFoundException(evt.getChannelId());
}
+ if(body.routingKey != null)
+ {
+ body.routingKey = body.routingKey.intern();
+ }
+
MessagePublishInfo info =
session.getRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
channel.setPublishFrame(info, session);
}
}
}
+
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
Tue Jul 17 04:38:10 2007
@@ -83,7 +83,9 @@
try
{
- exchange = exchangeFactory.createExchange(body.exchange,
body.type, body.durable,
+ exchange = exchangeFactory.createExchange(body.exchange ==
null ? null : body.exchange.intern(),
+ body.type ==
null ? null : body.type.intern(),
+ body.durable,
body.passive,
body.ticket);
exchangeRegistry.registerExchange(exchange);
}
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
Tue Jul 17 04:38:10 2007
@@ -97,6 +97,12 @@
{
throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange "
+ body.exchange + " does not exist.");
}
+
+ if (body.routingKey != null)
+ {
+ body.routingKey = body.routingKey.intern();
+ }
+
try
{
if (!exch.isBound(body.routingKey, body.arguments, queue))
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
Tue Jul 17 04:38:10 2007
@@ -91,8 +91,15 @@
synchronized (queueRegistry)
{
+
+
if (((queue = queueRegistry.getQueue(body.queue)) == null))
{
+ if(body.queue != null)
+ {
+ body.queue = body.queue.intern();
+ }
+
if (body.passive)
{
String msg = "Queue: " + body.queue + " not found on
VirtualHost(" + virtualHost + ").";
Modified:
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?view=diff&rev=556890&r1=556889&r2=556890
==============================================================================
---
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
(original)
+++
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
Tue Jul 17 04:38:10 2007
@@ -26,6 +26,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.lang.ref.WeakReference;
+
/**
* A short string is a representation of an AMQ Short String
* Short strings differ from the Java String class by being limited to on
ASCII characters (0-127)
@@ -34,6 +38,10 @@
*/
public final class AMQShortString implements CharSequence,
Comparable<AMQShortString>
{
+
+ private static final Map<AMQShortString, WeakReference<AMQShortString>>
internMap =
+ new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+
private static final Logger _logger =
LoggerFactory.getLogger(AMQShortString.class);
private final ByteBuffer _data;
@@ -43,7 +51,6 @@
public AMQShortString(byte[] data)
{
-
_data = ByteBuffer.wrap(data);
_length = data.length;
}
@@ -374,6 +381,29 @@
}
return (length() == name.length()) ? 0 : -1;
+ }
+ }
+
+
+ public AMQShortString intern()
+ {
+ hashCode();
+ synchronized(internMap)
+ {
+
+ WeakReference<AMQShortString> ref = internMap.get(this);
+ if(ref != null)
+ {
+ AMQShortString internString = ref.get();
+ if(internString != null)
+ {
+ return internString;
+ }
+ }
+
+ AMQShortString internString = new AMQShortString(getBytes());
+ internMap.put(internString, new WeakReference(internString));
+ return internString;
}
}
}