http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java index 75faa97..57865b7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java @@ -154,7 +154,7 @@ public class AMQConsumer { } addressInfo.setInternal(internalAddress); if (isDurable) { - queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName)); + queueName = org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName); QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName); if (result.isExists()) { // Already exists
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 64d1353..bca7eae 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -27,6 +27,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; @@ -82,6 +83,8 @@ public class AMQSession implements SessionCallback { private final OpenWireProtocolManager protocolManager; + private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); + public AMQSession(ConnectionInfo connInfo, SessionInfo sessInfo, ActiveMQServer server, @@ -295,7 +298,7 @@ public class AMQSession implements SessionCallback { } @Override - public void disconnect(ServerConsumer consumerId, String queueName) { + public void disconnect(ServerConsumer consumerId, SimpleString queueName) { // TODO Auto-generated method stub } @@ -315,7 +318,7 @@ public class AMQSession implements SessionCallback { actualDestinations = new ActiveMQDestination[]{destination}; } - org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend); + org.apache.activemq.artemis.api.core.Message originalCoreMsg = getConverter().inbound(messageSend, coreMessageObjectPools); originalCoreMsg.putStringProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME.toString(), this.connection.getState().getInfo().getClientId()); @@ -338,12 +341,12 @@ public class AMQSession implements SessionCallback { for (int i = 0; i < actualDestinations.length; i++) { ActiveMQDestination dest = actualDestinations[i]; - SimpleString address = new SimpleString(dest.getPhysicalName()); + SimpleString address = SimpleString.toSimpleString(dest.getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()); org.apache.activemq.artemis.api.core.Message coreMsg = originalCoreMsg.copy(); coreMsg.setAddress(address); if (actualDestinations[i].isQueue()) { - checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary()); + checkAutoCreateQueue(SimpleString.toSimpleString(actualDestinations[i].getPhysicalName(), coreMessageObjectPools.getAddressStringSimpleStringPool()), actualDestinations[i].isTemporary()); coreMsg.setRoutingType(RoutingType.ANYCAST); } else { coreMsg.setRoutingType(RoutingType.MULTICAST); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 18e0b10..0a12b47 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -239,7 +239,7 @@ public class StompSession implements SessionCallback { } @Override - public void disconnect(ServerConsumer consumerId, String queueName) { + public void disconnect(ServerConsumer consumerId, SimpleString queueName) { StompSubscription stompSubscription = subscriptions.remove(consumerId.getID()); if (stompSubscription != null) { StompFrame frame = connection.getFrameHandler().createStompFrame(Stomp.Responses.ERROR); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java ---------------------------------------------------------------------- diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java index 9133cdf..f343ec9 100644 --- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java +++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java @@ -111,7 +111,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList // Create the message consumer SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector); if (activation.isTopic() && spec.isSubscriptionDurable()) { - SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName())); + SimpleString queueName = ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName()); QueueQuery subResponse = session.queueQuery(queueName); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java index 2276fdb..d38f45f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ServerPacketDecoder.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.protocol; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.Packet; @@ -54,7 +53,6 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReq import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendLargeMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionSendMessage_1X; -import org.apache.activemq.artemis.utils.collections.TypedProperties; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.BACKUP_REQUEST_RESPONSE; @@ -85,34 +83,15 @@ import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SES public class ServerPacketDecoder extends ClientPacketDecoder { - private static final int UUID_LENGTH = 36; - private static final int DEFAULT_INTERNER_CAPACITY = 32; private static final long serialVersionUID = 3348673114388400766L; - private SimpleString.Interner keysInterner; - private TypedProperties.StringValue.Interner valuesInterner; - - public ServerPacketDecoder() { - this.keysInterner = null; - this.valuesInterner = null; - } - - private void initializeInternersIfNeeded() { - if (this.keysInterner == null) { - this.keysInterner = new SimpleString.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH); - } - if (this.valuesInterner == null) { - this.valuesInterner = new TypedProperties.StringValue.Interner(DEFAULT_INTERNER_CAPACITY, UUID_LENGTH); - } - } private SessionSendMessage decodeSessionSendMessage(final ActiveMQBuffer in, CoreRemotingConnection connection) { final SessionSendMessage sendMessage; - initializeInternersIfNeeded(); if (connection.isVersionBeforeAddressChange()) { - sendMessage = new SessionSendMessage_1X(new CoreMessage(this.keysInterner, this.valuesInterner)); + sendMessage = new SessionSendMessage_1X(new CoreMessage(this.coreMessageObjectPools)); } else { - sendMessage = new SessionSendMessage(new CoreMessage(this.keysInterner, this.valuesInterner)); + sendMessage = new SessionSendMessage(new CoreMessage(this.coreMessageObjectPools)); } sendMessage.decode(in); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java index 8b281eb..f53d028 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Packet; import org.apache.activemq.artemis.core.protocol.core.ServerSessionPacketHandler; @@ -48,6 +49,8 @@ public final class CoreSessionCallback implements SessionCallback { private ServerSessionPacketHandler handler; + private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools(); + public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel, @@ -115,9 +118,9 @@ public final class CoreSessionCallback implements SessionCallback { Packet packet; if (channel.getConnection().isVersionBeforeAddressChange()) { - packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(), deliveryCount); + packet = new SessionReceiveMessage_1X(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount); } else { - packet = new SessionReceiveMessage(consumer.getID(), message.toCore(), deliveryCount); + packet = new SessionReceiveMessage(consumer.getID(), message.toCore(coreMessageObjectPools), deliveryCount); } int size = 0; @@ -159,11 +162,11 @@ public final class CoreSessionCallback implements SessionCallback { } @Override - public void disconnect(ServerConsumer consumerId, String queueName) { + public void disconnect(ServerConsumer consumerId, SimpleString queueName) { if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) { channel.send(new DisconnectConsumerMessage(consumerId.getID())); } else { - ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName); + ActiveMQServerLogger.LOGGER.warnDisconnectOldClient(queueName.toString()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 5dc1b93..15b1465 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -1045,7 +1045,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { @Override public void disconnect() { - callback.disconnect(this, getQueue().getName().toString()); + callback.disconnect(this, getQueue().getName()); } public float getRate() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java index a440e31..2c81343 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/MessageConverter.java @@ -18,10 +18,11 @@ package org.apache.activemq.artemis.spi.core.protocol; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; public interface MessageConverter<ProtocolMessage extends Message> { - ICoreMessage toCore(ProtocolMessage pureMessage) throws Exception; + ICoreMessage toCore(ProtocolMessage pureMessage, CoreMessageObjectPools coreMessageObjectPools) throws Exception; ProtocolMessage fromCore(ICoreMessage coreMessage) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java index ae1612f..c4a2dbe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java @@ -81,7 +81,7 @@ public interface SessionCallback { void closed(); - void disconnect(ServerConsumer consumerId, String queueName); + void disconnect(ServerConsumer consumerId, SimpleString queueName); boolean isWritable(ReadyListener callback, Object protocolContext); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 2707190..5cfac12 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.message.impl.CoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.Persister; @@ -334,6 +335,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { @Override public CoreMessage toCore() { + return toCore(null); + } + + @Override + public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { return null; } @@ -591,6 +597,11 @@ public class ScheduledDeliveryHandlerTest extends Assert { } @Override + public Message putStringProperty(SimpleString key, String value) { + return null; + } + + @Override public Message putStringProperty(String key, String value) { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java index d7c9855..078c397 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AcknowledgeTest.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; +import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQConsumerContext; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -386,6 +387,11 @@ public class AcknowledgeTest extends ActiveMQTestBase { @Override public ICoreMessage toCore() { + return toCore(null); + } + + @Override + public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) { return null; } @@ -648,6 +654,11 @@ public class AcknowledgeTest extends ActiveMQTestBase { } @Override + public Message putStringProperty(SimpleString key, String value) { + return null; + } + + @Override public Message putStringProperty(String key, String value) { return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 2f25480..dc57a12 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -585,7 +585,7 @@ public class HangConsumerTest extends ActiveMQTestBase { } @Override - public void disconnect(ServerConsumer consumerId, String queueName) { + public void disconnect(ServerConsumer consumerId, SimpleString queueName) { //To change body of implemented methods use File | Settings | File Templates. } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98028cde/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java index 790ed82..aaf29b0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/XmlImportExportTest.java @@ -128,7 +128,7 @@ public class XmlImportExportTest extends ActiveMQTestBase { msg.putStringProperty("myNonAsciiStringProperty", international.toString()); msg.putStringProperty("mySpecialCharacters", special); msg.putStringProperty(new SimpleString("mySimpleStringProperty"), new SimpleString("mySimpleStringPropertyValue_" + i)); - msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), null); + msg.putStringProperty(new SimpleString("myNullSimpleStringProperty"), (SimpleString) null); producer.send(msg); }