Fixing converters part I
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b557f2df Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b557f2df Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b557f2df Branch: refs/heads/artemis-1009 Commit: b557f2df6d0287a21adfaf67f6e93875ba174b5b Parents: 9d3260b Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Thu Mar 2 18:57:23 2017 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Mar 2 19:38:06 2017 -0500 ---------------------------------------------------------------------- .../cli/commands/tools/XmlDataExporter.java | 32 +- .../cli/commands/tools/XmlDataExporterUtil.java | 9 +- .../cli/commands/tools/XmlDataImporter.java | 7 +- .../activemq/artemis/api/core/ICoreMessage.java | 87 +++ .../activemq/artemis/api/core/Message.java | 96 +-- .../artemis/api/core/client/ClientMessage.java | 61 +- .../api/core/management/ManagementHelper.java | 18 +- .../core/client/impl/ClientConsumerImpl.java | 2 +- .../core/client/impl/ClientProducerImpl.java | 18 +- .../artemis/core/message/impl/CoreMessage.java | 73 +-- .../core/impl/ActiveMQSessionContext.java | 3 +- .../core/impl/wireformat/MessagePacket.java | 9 +- .../impl/wireformat/SessionReceiveMessage.java | 8 +- .../impl/wireformat/SessionSendMessage.java | 4 +- .../spi/core/remoting/SessionContext.java | 5 +- .../api/jms/management/JMSManagementHelper.java | 3 +- .../jms/transaction/JMSTransactionDetail.java | 6 +- .../protocol/amqp/broker/AMQPMessage.java | 91 +-- .../amqp/broker/AMQPSessionCallback.java | 16 +- .../amqp/broker/ProtonProtocolManager.java | 12 +- .../protocol/amqp/converter/AMQPConverter.java | 44 ++ .../amqp/converter/AmqpCoreConverter.java | 366 +++++++++++ .../amqp/converter/CoreAmqpConverter.java | 565 +++++++++++++++++ .../amqp/converter/ProtonMessageConverter.java | 101 ---- .../converter/jms/ServerJMSBytesMessage.java | 6 +- .../amqp/converter/jms/ServerJMSMapMessage.java | 5 +- .../amqp/converter/jms/ServerJMSMessage.java | 64 +- .../converter/jms/ServerJMSObjectMessage.java | 8 +- .../converter/jms/ServerJMSStreamMessage.java | 5 +- .../converter/jms/ServerJMSTextMessage.java | 5 +- .../converter/message/AMQPMessageSupport.java | 76 +-- .../converter/message/AMQPMessageTypes.java | 30 - .../message/AMQPNativeOutboundTransformer.java | 80 --- .../amqp/converter/message/EncodedMessage.java | 67 --- .../converter/message/InboundTransformer.java | 240 -------- .../message/JMSMappingInboundTransformer.java | 182 ------ .../message/JMSMappingOutboundTransformer.java | 574 ------------------ .../converter/message/OutboundTransformer.java | 53 -- .../amqp/proton/ProtonServerSenderContext.java | 32 +- .../amqp/converter/TestConversions.java | 599 +------------------ .../JMSMappingInboundTransformerTest.java | 166 ++--- .../JMSMappingOutboundTransformerTest.java | 204 ++----- .../JMSTransformationSpeedComparisonTest.java | 86 ++- .../message/MessageTransformationTest.java | 116 +--- .../protocol/amqp/message/AMQPMessageTest.java | 2 +- .../core/protocol/mqtt/MQTTProtocolManager.java | 6 - .../artemis/core/protocol/mqtt/MQTTUtil.java | 5 +- .../openwire/OpenWireMessageConverter.java | 19 +- .../openwire/OpenWireProtocolManager.java | 8 +- .../core/protocol/openwire/OpenwireMessage.java | 473 +++++++++++++++ .../core/protocol/openwire/amq/AMQConsumer.java | 5 +- .../core/protocol/openwire/amq/AMQSession.java | 5 +- .../core/protocol/stomp/StompConnection.java | 6 +- .../protocol/stomp/StompProtocolManager.java | 8 - .../core/protocol/stomp/StompSession.java | 4 +- .../stomp/VersionedStompFrameHandler.java | 5 +- .../stomp/v12/StompFrameHandlerV12.java | 4 +- .../impl/openmbean/OpenTypeSupport.java | 12 +- .../activemq/artemis/core/paging/impl/Page.java | 3 +- .../core/paging/impl/PagedMessageImpl.java | 5 +- .../core/ServerSessionPacketHandler.java | 3 +- .../protocol/core/impl/CoreProtocolManager.java | 13 +- .../protocol/core/impl/CoreSessionCallback.java | 5 +- .../artemis/core/server/LargeServerMessage.java | 4 +- .../artemis/core/server/ServerConsumer.java | 3 +- .../core/server/impl/ServerConsumerImpl.java | 11 +- .../server/management/ManagementService.java | 6 +- .../management/impl/ManagementServiceImpl.java | 5 +- .../transaction/impl/CoreTransactionDetail.java | 7 +- .../spi/core/protocol/MessageConverter.java | 7 +- .../spi/core/protocol/ProtocolManager.java | 10 +- .../group/impl/ClusteredResetMockTest.java | 6 +- .../impl/ScheduledDeliveryHandlerTest.java | 56 +- .../artemis/tests/util/ActiveMQTestBase.java | 3 +- .../integration/client/AcknowledgeTest.java | 62 +- .../tests/integration/client/ConsumerTest.java | 43 +- .../integration/client/LargeMessageTest.java | 3 +- .../integration/clientcrash/ClientExitTest.java | 4 +- .../integration/journal/MessageJournalTest.java | 5 +- .../management/ManagementHelperTest.java | 8 +- .../management/ManagementServiceImplTest.java | 19 +- .../integration/paging/PagingSendTest.java | 3 +- .../tests/integration/paging/PagingTest.java | 4 +- .../tests/integration/server/ScaleDownTest.java | 4 +- .../ssl/CoreClientOverOneWaySSLTest.java | 4 +- .../ssl/CoreClientOverTwoWaySSLTest.java | 5 +- .../unit/core/message/impl/MessageImplTest.java | 5 +- .../tests/unit/core/paging/impl/PageTest.java | 4 +- .../core/paging/impl/PagingManagerImplTest.java | 14 +- .../core/paging/impl/PagingStoreImplTest.java | 24 +- 90 files changed, 2136 insertions(+), 3018 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java index b57b5c5..b53db48 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporter.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.artemis.cli.commands.tools; +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; import java.io.File; import java.io.OutputStream; import java.lang.reflect.InvocationHandler; @@ -33,14 +36,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import javax.xml.stream.XMLOutputFactory; -import javax.xml.stream.XMLStreamException; -import javax.xml.stream.XMLStreamWriter; - +import io.airlift.airline.Command; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.config.Configuration; @@ -74,8 +76,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.LargeServerMessage; -import org.apache.activemq.artemis.api.core.RoutingType; - import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.HierarchicalObjectRepository; @@ -83,8 +83,6 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.OrderedExecutorFactory; -import io.airlift.airline.Command; - @Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.") public final class XmlDataExporter extends OptionalLocking { @@ -361,13 +359,13 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeEndElement(); // end BINDINGS_PARENT } - private void printAllMessagesAsXML() throws XMLStreamException { + private void printAllMessagesAsXML() throws Exception { xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_PARENT); // Order here is important. We must process the messages from the journal before we process those from the page // files in order to get the messages in the right order. for (Map.Entry<Long, Message> messageMapEntry : messages.entrySet()) { - printSingleMessageAsXML(messageMapEntry.getValue(), extractQueueNames(messageRefs.get(messageMapEntry.getKey()))); + printSingleMessageAsXML(messageMapEntry.getValue().toCore(), extractQueueNames(messageRefs.get(messageMapEntry.getKey()))); } printPagedMessagesAsXML(); @@ -441,7 +439,7 @@ public final class XmlDataExporter extends OptionalLocking { } if (queueNames.size() > 0 && (message.getTransactionID() == -1 || pgTXs.contains(message.getTransactionID()))) { - printSingleMessageAsXML(message.getMessage(), queueNames); + printSingleMessageAsXML(message.getMessage().toCore(), queueNames); } messageId++; @@ -458,20 +456,20 @@ public final class XmlDataExporter extends OptionalLocking { } } - private void printSingleMessageAsXML(Message message, List<String> queues) throws XMLStreamException { + private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception { xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD); printMessageAttributes(message); printMessageProperties(message); printMessageQueues(queues); - printMessageBody(message); + printMessageBody(message.toCore()); xmlWriter.writeEndElement(); // end MESSAGES_CHILD messagesPrinted++; } - private void printMessageBody(Message message) throws XMLStreamException { + private void printMessageBody(Message message) throws Exception { xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY); - if (message.isLargeMessage()) { + if (message.toCore().isLargeMessage()) { printLargeMessageBody((LargeServerMessage) message); } else { xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message)); @@ -484,7 +482,7 @@ public final class XmlDataExporter extends OptionalLocking { LargeBodyEncoder encoder = null; try { - encoder = message.getBodyEncoder(); + encoder = message.toCore().getBodyEncoder(); encoder.open(); long totalBytesWritten = 0; Long bufferSize; @@ -541,7 +539,7 @@ public final class XmlDataExporter extends OptionalLocking { xmlWriter.writeEndElement(); // end PROPERTIES_PARENT } - private void printMessageAttributes(Message message) throws XMLStreamException { + private void printMessageAttributes(ICoreMessage message) throws XMLStreamException { xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID())); xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority())); xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java index a3807bd..ca7f1a8 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataExporterUtil.java @@ -17,9 +17,9 @@ package org.apache.activemq.artemis.cli.commands.tools; import com.google.common.base.Preconditions; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.utils.Base64; /** @@ -91,12 +91,13 @@ public class XmlDataExporterUtil { * @param message * @return */ - public static String encodeMessageBody(final Message message) { + public static String encodeMessageBody(final Message message) throws Exception { Preconditions.checkNotNull(message, "ServerMessage can not be null"); - int size = ((CoreMessage)message.toCore()).getEndOfBodyPosition() - message.getBodyBuffer().readerIndex(); + ICoreMessage coreMessage = message.toCore(); + int size = coreMessage.getEndOfBodyPosition() - coreMessage.getBodyBuffer().readerIndex(); byte[] buffer = new byte[size]; - message.getBodyBuffer().readBytes(buffer); + message.toCore().getBodyBuffer().readBytes(buffer); return XmlDataExporterUtil.encode(buffer); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java ---------------------------------------------------------------------- diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java index 0f06738..518d231 100644 --- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java +++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/XmlDataImporter.java @@ -45,7 +45,9 @@ import java.util.UUID; import io.airlift.airline.Command; import io.airlift.airline.Option; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; @@ -62,7 +64,6 @@ import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.utils.Base64; import org.apache.activemq.artemis.utils.ClassloadingUtil; import org.apache.activemq.artemis.utils.ListUtil; @@ -297,7 +298,7 @@ public final class XmlDataImporter extends ActionAbstract { switch (eventType) { case XMLStreamConstants.START_ELEMENT: if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) { - processMessageBody(message); + processMessageBody(message.toCore()); } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) { processMessageProperties(message); } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) { @@ -468,7 +469,7 @@ public final class XmlDataImporter extends ActionAbstract { } } - private void processMessageBody(final Message message) throws XMLStreamException, IOException { + private void processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException { boolean isLarge = false; for (int i = 0; i < reader.getAttributeCount(); i++) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java new file mode 100644 index 0000000..9a58819 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/ICoreMessage.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.api.core; + +import java.io.InputStream; +import java.util.Map; + +import org.apache.activemq.artemis.core.message.LargeBodyEncoder; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; + +/** + * This interface is only to determine the API of methods required for Core Messages + */ +public interface ICoreMessage extends Message { + + LargeBodyEncoder getBodyEncoder() throws ActiveMQException; + + int getHeadersAndPropertiesEncodeSize(); + + @Override + InputStream getBodyInputStream(); + + /** Returns a new Buffer slicing the current Body. */ + ActiveMQBuffer getReadOnlyBodyBuffer(); + + /** Return the type of the message */ + @Override + byte getType(); + + /** the type of the message */ + @Override + CoreMessage setType(byte type); + + /** + * We are really interested if this is a LargeServerMessage. + * @return + */ + boolean isServerMessage(); + + /** + * The body used for this message. + * @return + */ + @Override + ActiveMQBuffer getBodyBuffer(); + + int getEndOfBodyPosition(); + + + /** + * @return Returns the message in Map form, useful when encoding to JSON + */ + @Override + default Map<String, Object> toMap() { + Map map = toPropertyMap(); + map.put("messageID", getMessageID()); + Object userID = getUserID(); + if (getUserID() != null) { + map.put("userID", "ID:" + userID.toString()); + } + + map.put("address", getAddress()); + map.put("type", getType()); + map.put("durable", isDurable()); + map.put("expiration", getExpiration()); + map.put("timestamp", getTimestamp()); + map.put("priority", (int)getPriority()); + + return map; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index b08202d..73ee856 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -22,8 +22,6 @@ import java.util.Map; import java.util.Set; import io.netty.buffer.ByteBuf; -import org.apache.activemq.artemis.api.core.encode.BodyType; -import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.persistence.Persister; /** @@ -166,36 +164,44 @@ public interface Message { byte STREAM_TYPE = 6; - - void messageChanged(); - - /** Used to calculate what is the delivery time. - * Return null if not scheduled. */ - Long getScheduledDeliveryTime(); - - /** Used for Large messages on Core. - * Do not use this, it will go away - * @deprecated use it directly from core message, as it doesn't make sense on other protocols */ + /** + * @deprecated do not use this, use through ICoreMessage or ClientMessage + */ @Deprecated default InputStream getBodyInputStream() { return null; } /** - * Careful: Unless you are changing the body of the message, prefer getReadOnlyBodyBuffer - * @deprecated use it directly from core message, as it doesn't make sense on other protocols */ + * @deprecated do not use this, use through ICoreMessage or ClientMessage + */ @Deprecated - ActiveMQBuffer getBodyBuffer(); + default ActiveMQBuffer getBodyBuffer() { + return null; + } - /** - * @deprecated use it directly from core message, as it doesn't make sense on other protocols */ + /** + * @deprecated do not use this, use through ICoreMessage or ClientMessage + */ @Deprecated - ActiveMQBuffer getReadOnlyBodyBuffer(); + default byte getType() { + return (byte)0; + } - /** Used in the cases of large messages - * @deprecated use it directly from core message, as it doesn't make sense on other protocols */ + /** + * @deprecated do not use this, use through ICoreMessage or ClientMessage + */ @Deprecated - LargeBodyEncoder getBodyEncoder() throws ActiveMQException; + default Message setType(byte type) { + return this; + } + + + void messageChanged(); + + /** Used to calculate what is the delivery time. + * Return null if not scheduled. */ + Long getScheduledDeliveryTime(); /** Context can be used by the application server to inject extra control, like a protocol specific on the server. * There is only one per Object, use it wisely! @@ -209,27 +215,6 @@ public interface Message { /** The buffer will belong to this message, until release is called. */ Message setBuffer(ByteBuf buffer); - // TODO-now: Do we need this? - byte getType(); - - // TODO-now: Do we need this? - Message setType(byte type); - - /** - * Returns whether this message is a <em>large message</em> or a regular message. - */ - boolean isLargeMessage(); - - /** - * TODO: There's currently some treatment on LargeMessage that is done for server's side large message - * This needs to be refactored, this Method shouldn't be used at all. - * @Deprecated do not use this, internal use only. *It will* be removed for sure even on minor releases. - * */ - @Deprecated - default boolean isServerMessage() { - return false; - } - ByteBuf getBuffer(); /** It will generate a new instance of the message encode, being a deep copy, new properties, new everything */ @@ -247,6 +232,10 @@ public interface Message { Message setMessageID(long id); + default boolean isLargeMessage() { + return false; + } + /** * Returns the expiration time of this message. */ @@ -297,16 +286,6 @@ public interface Message { Persister<Message> getPersister(); - Object getProtocol(); - - Message setProtocol(Object protocol); - - Object getBody(); - - BodyType getBodyType(); - - Message setBody(BodyType type, Object body); - String getAddress(); Message setAddress(String address); @@ -356,16 +335,6 @@ public interface Message { } setBuffer(null); } - - default String getText() { - if (getBodyType() == BodyType.Text) { - return getBody().toString(); - } else { - return null; - } - } - - // TODO-now: move this to some utility class default void referenceOriginalMessage(final Message original, String originalQueue) { String queueOnMessage = original.getStringProperty(Message.HDR_ORIGINAL_QUEUE.toString()); @@ -559,7 +528,6 @@ public interface Message { } map.put("address", getAddress()); - map.put("type", getBodyType().toString()); map.put("durable", isDurable()); map.put("expiration", getExpiration()); map.put("timestamp", getTimestamp()); @@ -581,7 +549,7 @@ public interface Message { /** This should make you convert your message into Core format. */ - Message toCore(); + ICoreMessage toCore(); int getMemoryEstimate(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java index daded00..67f2150 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientMessage.java @@ -19,14 +19,15 @@ package org.apache.activemq.artemis.api.core.client; import java.io.InputStream; import java.io.OutputStream; +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.SimpleString; /** * A ClientMessage represents a message sent and/or received by ActiveMQ Artemis. */ -public interface ClientMessage extends Message { +public interface ClientMessage extends ICoreMessage { /** * Returns the number of times this message was delivered. @@ -123,126 +124,140 @@ public interface ClientMessage extends Message { ClientMessage setBodyInputStream(InputStream bodyInputStream); /** - * Overridden from {@link Message} to enable fluent API + * Return the bodyInputStream for large messages + * @return + */ + @Override + InputStream getBodyInputStream(); + + /** + * The buffer to write the body. + * @return + */ + @Override + ActiveMQBuffer getBodyBuffer(); + + /** + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putBooleanProperty(SimpleString key, boolean value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putBooleanProperty(String key, boolean value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putByteProperty(SimpleString key, byte value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putByteProperty(String key, byte value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putBytesProperty(SimpleString key, byte[] value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putBytesProperty(String key, byte[] value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putShortProperty(SimpleString key, short value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putShortProperty(String key, short value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putCharProperty(SimpleString key, char value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putCharProperty(String key, char value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putIntProperty(SimpleString key, int value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putIntProperty(String key, int value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putLongProperty(SimpleString key, long value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putLongProperty(String key, long value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putFloatProperty(SimpleString key, float value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putFloatProperty(String key, float value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putDoubleProperty(SimpleString key, double value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putDoubleProperty(String key, double value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ @Override ClientMessage putStringProperty(String key, String value); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ ClientMessage writeBodyBufferBytes(byte[] bytes); /** - * Overridden from {@link Message} to enable fluent API + * Overridden from {@link org.apache.activemq.artemis.api.core.Message} to enable fluent API */ ClientMessage writeBodyBufferString(String string); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java index 40211c1..946285d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java @@ -18,9 +18,11 @@ package org.apache.activemq.artemis.api.core.management; import javax.json.JsonArray; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; /** * Helper class to use ActiveMQ Artemis Core messages to manage server resources. @@ -86,7 +88,7 @@ public final class ManagementHelper { * @param attribute the name of the attribute * @see ResourceNames */ - public static void putAttribute(final Message message, final String resourceName, final String attribute) { + public static void putAttribute(final ICoreMessage message, final String resourceName, final String attribute) { message.putStringProperty(ManagementHelper.HDR_RESOURCE_NAME, new SimpleString(resourceName)); message.putStringProperty(ManagementHelper.HDR_ATTRIBUTE, new SimpleString(attribute)); } @@ -99,7 +101,7 @@ public final class ManagementHelper { * @param operationName the name of the operation to invoke on the resource * @see ResourceNames */ - public static void putOperationInvocation(final Message message, + public static void putOperationInvocation(final ICoreMessage message, final String resourceName, final String operationName) throws Exception { ManagementHelper.putOperationInvocation(message, resourceName, operationName, (Object[]) null); @@ -114,7 +116,7 @@ public final class ManagementHelper { * @param parameters the parameters to use to invoke the server resource * @see ResourceNames */ - public static void putOperationInvocation(final Message message, + public static void putOperationInvocation(final ICoreMessage message, final String resourceName, final String operationName, final Object... parameters) throws Exception { @@ -141,7 +143,7 @@ public final class ManagementHelper { * Used by ActiveMQ Artemis management service. */ public static Object[] retrieveOperationParameters(final Message message) throws Exception { - SimpleString sstring = message.getBodyBuffer().readNullableSimpleString(); + SimpleString sstring = message.toCore().getReadOnlyBodyBuffer().readNullableSimpleString(); String jsonString = (sstring == null) ? null : sstring.toString(); if (jsonString != null) { @@ -170,7 +172,7 @@ public final class ManagementHelper { /** * Used by ActiveMQ Artemis management service. */ - public static void storeResult(final Message message, final Object result) throws Exception { + public static void storeResult(final CoreMessage message, final Object result) throws Exception { String resultString; if (result != null) { @@ -192,7 +194,7 @@ public final class ManagementHelper { * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}. * and the result will be a String corresponding to the server exception. */ - public static Object[] getResults(final Message message) throws Exception { + public static Object[] getResults(final ICoreMessage message) throws Exception { SimpleString sstring = message.getBodyBuffer().readNullableSimpleString(); String jsonString = (sstring == null) ? null : sstring.toString(); @@ -210,7 +212,7 @@ public final class ManagementHelper { * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}. * and the result will be a String corresponding to the server exception. */ - public static Object getResult(final Message message) throws Exception { + public static Object getResult(final ICoreMessage message) throws Exception { return getResult(message, null); } @@ -220,7 +222,7 @@ public final class ManagementHelper { * If an error occurred on the server, {@link #hasOperationSucceeded(Message)} will return {@code false}. * and the result will be a String corresponding to the server exception. */ - public static Object getResult(final Message message, Class desiredType) throws Exception { + public static Object getResult(final ICoreMessage message, Class desiredType) throws Exception { Object[] res = ManagementHelper.getResults(message); if (res != null) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java index d95aeba..82af968 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java @@ -625,7 +625,7 @@ public final class ClientConsumerImpl implements ClientConsumerInternal { currentLargeMessageController.setLocal(true); //sets the packet - ActiveMQBuffer qbuff = clMessage.getBodyBuffer(); + ActiveMQBuffer qbuff = clMessage.toCore().getBodyBuffer(); int bytesToRead = qbuff.writerIndex() - qbuff.readerIndex(); final byte[] body = ByteUtil.getActiveArray(qbuff.readBytes(bytesToRead).toByteBuffer()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index 1704de0..ce4a8a1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -23,12 +23,12 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; -import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; import org.apache.activemq.artemis.utils.ActiveMQBufferInputStream; import org.apache.activemq.artemis.utils.DeflaterReader; @@ -218,7 +218,7 @@ public class ClientProducerImpl implements ClientProducerInternal { try { // In case we received message from another protocol, we first need to convert it to core as the ClientProducer only understands core - Message msg = msgToSend.toCore(); + ICoreMessage msg = msgToSend.toCore(); ClientProducerCredits theCredits; @@ -259,7 +259,7 @@ public class ClientProducerImpl implements ClientProducerInternal { session.workDone(); if (isLarge) { - largeMessageSend(sendBlocking, (CoreMessage)msg, theCredits, handler); + largeMessageSend(sendBlocking, msg, theCredits, handler); } else { sendRegularMessage(sendingAddress, msg, sendBlocking, theCredits, handler); } @@ -268,12 +268,12 @@ public class ClientProducerImpl implements ClientProducerInternal { } } - private InputStream getBodyInputStream(Message msgI) { + private InputStream getBodyInputStream(ICoreMessage msgI) { return msgI.getBodyInputStream(); } private void sendRegularMessage(final SimpleString sendingAddress, - final Message msgI, + final ICoreMessage msgI, final boolean sendBlocking, final ClientProducerCredits theCredits, final SendAcknowledgementHandler handler) throws ActiveMQException { @@ -306,7 +306,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSend(final boolean sendBlocking, - final CoreMessage msgI, + final ICoreMessage msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { logger.tracef("largeMessageSend::%s, Blocking=%s", msgI, sendBlocking); @@ -353,7 +353,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendServer(final boolean sendBlocking, - final Message msgI, + final ICoreMessage msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { sendInitialLargeMessageHeader(msgI, credits); @@ -394,7 +394,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendBuffered(final boolean sendBlocking, - final Message msgI, + final ICoreMessage msgI, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { msgI.getBodyBuffer().readerIndex(0); @@ -409,7 +409,7 @@ public class ClientProducerImpl implements ClientProducerInternal { * @throws ActiveMQException */ private void largeMessageSendStreamed(final boolean sendBlocking, - final Message msgI, + final ICoreMessage msgI, final InputStream inputStreamParameter, final ClientProducerCredits credits, SendAcknowledgementHandler handler) throws ActiveMQException { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index f620a1d..bf642e0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.core.message.impl; +import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Set; @@ -25,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.SimpleString; @@ -41,7 +43,7 @@ import org.jboss.logging.Logger; /** Note: you shouldn't change properties using multi-threads. Change your properties before you can send it to multiple * consumers */ -public class CoreMessage extends RefCountMessage { +public class CoreMessage extends RefCountMessage implements ICoreMessage { public static final int BUFFER_HEADER_SPACE = PacketImpl.PACKET_HEADERS_SIZE; @@ -89,23 +91,10 @@ public class CoreMessage extends RefCountMessage { protected volatile TypedProperties properties; - private Object protocol; - public CoreMessage() { } @Override - public CoreMessage setProtocol(Object protocol) { - this.protocol = protocol; - return this; - } - - @Override - public Object getProtocol() { - return protocol; - } - - @Override public Persister<Message> getPersister() { return CoreMessagePersister.getInstance(); } @@ -164,6 +153,11 @@ public class CoreMessage extends RefCountMessage { return null; } + @Override + public InputStream getBodyInputStream() { + return null; + } + /** * {@inheritDoc} */ @@ -187,6 +181,7 @@ public class CoreMessage extends RefCountMessage { } } + @Override public int getEndOfBodyPosition() { if (endOfBodyPosition < 0) { endOfBodyPosition = getBodyBuffer().writerIndex(); @@ -238,7 +233,7 @@ public class CoreMessage extends RefCountMessage { messageID = msg.getMessageID(); address = msg.getAddressSimpleString(); userID = (UUID)msg.getUserID(); - type = msg.getType(); + type = msg.toCore().getType(); durable = msg.isDurable(); expiration = msg.getExpiration(); timestamp = msg.getTimestamp(); @@ -369,6 +364,17 @@ public class CoreMessage extends RefCountMessage { } @Override + public boolean isServerMessage() { + // even though CoreMessage is used both on server and client + // callers are interested in knowing if this is a server large message + // as it will be used to send the body from the files. + // + // this may need further refactoring when we improve large messages + // and expose that functionality to other protocols. + return false; + } + + @Override public byte getType() { return type; } @@ -467,27 +473,6 @@ public class CoreMessage extends RefCountMessage { } @Override - public Object getBody() { - - if (body == null) { - decodeBody(); - } - - return body; - } - - private void decodeBody() { - buffer.readerIndex(DataConstants.SIZE_INT); - switch (getBodyType()) { - case Text: - body = SimpleString.readNullableSimpleString(buffer); - break; - - default: - break; - } - } - public int getHeadersAndPropertiesEncodeSize() { return DataConstants.SIZE_LONG + // Message ID DataConstants.SIZE_BYTE + // user id null? @@ -501,10 +486,6 @@ public class CoreMessage extends RefCountMessage { /* PropertySize and Properties */checkProperties().getEncodeSize(); } - @Override - public BodyType getBodyType() { - return getBodyType(type); - } public static BodyType getBodyType(byte type) { switch (type) { @@ -540,16 +521,6 @@ public class CoreMessage extends RefCountMessage { } @Override - public CoreMessage setBody(final BodyType bodyType, Object body) { - messageChanged(); - - this.type = Message.TEXT_TYPE; - this.body = body; - - return this; - } - - @Override public boolean isLargeMessage() { return false; } @@ -1016,7 +987,7 @@ public class CoreMessage extends RefCountMessage { } @Override - public Message toCore() { + public CoreMessage toCore() { return this; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 9975a5b..38cc177 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -31,6 +31,7 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -427,7 +428,7 @@ public class ActiveMQSessionContext extends SessionContext { } @Override - public void sendFullMessage(Message msgI, + public void sendFullMessage(ICoreMessage msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java index ec2520a..49989d3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/MessagePacket.java @@ -16,26 +16,25 @@ */ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; - import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public abstract class MessagePacket extends PacketImpl implements MessagePacketI { - protected Message message; + protected ICoreMessage message; - public MessagePacket(final byte type, final Message message) { + public MessagePacket(final byte type, final ICoreMessage message) { super(type); this.message = message; } @Override - public Message getMessage() { + public ICoreMessage getMessage() { return message; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java index c03d3c8..b0ab52b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionReceiveMessage.java @@ -17,8 +17,8 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; - -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.DataConstants; @@ -31,7 +31,7 @@ public class SessionReceiveMessage extends MessagePacket { private int deliveryCount; - public SessionReceiveMessage(final long consumerID, final Message message, final int deliveryCount) { + public SessionReceiveMessage(final long consumerID, final ICoreMessage message, final int deliveryCount) { super(SESS_RECEIVE_MSG, message); this.consumerID = consumerID; @@ -39,7 +39,7 @@ public class SessionReceiveMessage extends MessagePacket { this.deliveryCount = deliveryCount; } - public SessionReceiveMessage(final Message message) { + public SessionReceiveMessage(final CoreMessage message) { super(SESS_RECEIVE_MSG, message); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java index 8182b90..43bb0be 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendMessage.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -38,7 +38,7 @@ public class SessionSendMessage extends MessagePacket { private final transient SendAcknowledgementHandler handler; /** This will be using the CoreMessage because it is meant for the core-protocol */ - public SessionSendMessage(final Message message, + public SessionSendMessage(final ICoreMessage message, final boolean requiresResponse, final SendAcknowledgementHandler handler) { super(SESS_SEND, message); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java index 8bb0081..3fddb8e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/spi/core/remoting/SessionContext.java @@ -23,7 +23,9 @@ import java.util.Set; import java.util.concurrent.Executor; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientSession; @@ -33,7 +35,6 @@ import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientMessageInternal; import org.apache.activemq.artemis.core.client.impl.ClientProducerCreditsImpl; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; @@ -129,7 +130,7 @@ public abstract class SessionContext { public abstract int getCreditsOnSendingFull(Message msgI); - public abstract void sendFullMessage(Message msgI, + public abstract void sendFullMessage(ICoreMessage msgI, boolean sendBlocking, SendAcknowledgementHandler handler, SimpleString defaultAddress) throws ActiveMQException; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java ---------------------------------------------------------------------- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java index 0e99106..4d0306b 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSManagementHelper.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.jms.management; import javax.jms.JMSException; import javax.jms.Message; +import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.jms.client.ActiveMQMessage; @@ -27,7 +28,7 @@ import org.apache.activemq.artemis.jms.client.ActiveMQMessage; */ public class JMSManagementHelper { - private static org.apache.activemq.artemis.api.core.Message getCoreMessage(final Message jmsMessage) { + private static ClientMessage getCoreMessage(final Message jmsMessage) { if (jmsMessage instanceof ActiveMQMessage == false) { throw new IllegalArgumentException("Cannot send a foreign message as a management message " + jmsMessage.getClass().getName()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java ---------------------------------------------------------------------- diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java index 289f88c..ecb4ccb 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/transaction/JMSTransactionDetail.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.jms.transaction; import javax.transaction.xa.Xid; import java.util.Map; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.TransactionDetail; @@ -37,7 +38,10 @@ public class JMSTransactionDetail extends TransactionDetail { @Override public String decodeMessageType(Message msg) { - int type = msg.getType(); + if (!(msg instanceof ICoreMessage)) { + return "N/A"; + } + int type = ((ICoreMessage) msg).getType(); switch (type) { case ActiveMQMessage.TYPE: // 0 return "Default"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 772f2cd..456d281 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -27,13 +27,12 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.RefCountMessage; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.encode.BodyType; -import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.core.persistence.Persister; +import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.utils.DataConstants; @@ -60,7 +59,6 @@ public class AMQPMessage extends RefCountMessage { String address; MessageImpl protonMessage; private volatile int memoryEstimate = -1; - private ProtonProtocolManager protocolManager; private long expiration = 0; // this can be used to encode the header again and the rest of the message buffer private int headerEnd = -1; @@ -71,8 +69,7 @@ public class AMQPMessage extends RefCountMessage { private Properties _properties; private ApplicationProperties applicationProperties; - public AMQPMessage(long messageFormat, byte[] data, ProtonProtocolManager protocolManager) { - this.protocolManager = protocolManager; + public AMQPMessage(long messageFormat, byte[] data) { this.data = Unpooled.wrappedBuffer(data); this.messageFormat = messageFormat; this.bufferValid = true; @@ -86,15 +83,14 @@ public class AMQPMessage extends RefCountMessage { } - public AMQPMessage(long messageFormat, Message message, ProtonProtocolManager protocolManager) { - this.protocolManager = protocolManager; - this.protonMessage = (MessageImpl)message; + public AMQPMessage(long messageFormat, Message message) { this.messageFormat = messageFormat; + this.protonMessage = (MessageImpl)message; } - public AMQPMessage(Message message, ProtonProtocolManager protocolManager) { - this(0, message, protocolManager); + public AMQPMessage(Message message) { + this(0, message); } public MessageImpl getProtonMessage() { @@ -292,40 +288,6 @@ public class AMQPMessage extends RefCountMessage { } @Override - public ActiveMQBuffer getBodyBuffer() { - // NO-IMPL - return null; - } - - @Override - public ActiveMQBuffer getReadOnlyBodyBuffer() { - // NO-IMPL - return null; - } - - @Override - public LargeBodyEncoder getBodyEncoder() throws ActiveMQException { - // NO-IMPL - return null; - } - - @Override - public byte getType() { - return type; - } - - @Override - public AMQPMessage setType(byte type) { - this.type = type; - return this; - } - - @Override - public boolean isLargeMessage() { - return false; - } - - @Override public ByteBuf getBuffer() { if (data == null) { return null; @@ -342,12 +304,14 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message copy() { - AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array(), protocolManager); + checkBuffer(); + AMQPMessage newEncode = new AMQPMessage(this.messageFormat, data.array()); return newEncode; } @Override public org.apache.activemq.artemis.api.core.Message copy(long newID) { + checkBuffer(); return copy().setMessageID(newID); } @@ -403,32 +367,6 @@ public class AMQPMessage extends RefCountMessage { } @Override - public Object getProtocol() { - return protocolManager; - } - - @Override - public AMQPMessage setProtocol(Object protocol) { - this.protocolManager = (ProtonProtocolManager)protocol; - return this; - } - - @Override - public Object getBody() { - return null; - } - - @Override - public BodyType getBodyType() { - return null; - } - - @Override - public org.apache.activemq.artemis.api.core.Message setBody(BodyType type, Object body) { - return null; - } - - @Override public String getAddress() { if (address == null) { Properties properties = getProtonMessage().getProperties(); @@ -794,9 +732,12 @@ public class AMQPMessage extends RefCountMessage { } @Override - public org.apache.activemq.artemis.api.core.Message toCore() { - MessageImpl protonMessage = getProtonMessage(); - throw new IllegalStateException("conversion between AMQP and Core not implemented yet!"); + public ICoreMessage toCore() { + try { + return AMQPConverter.getInstance().toCore(this); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index f34298c..5931afe 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.io.IOCallback; @@ -33,14 +34,13 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; -import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; +import org.apache.activemq.artemis.protocol.amqp.converter.CoreAmqpConverter; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; @@ -64,7 +64,6 @@ import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; @@ -296,13 +295,6 @@ public class AMQPSessionCallback implements SessionCallback { } } - public long encodeMessage(Message message, int deliveryCount, WritableBuffer buffer) throws Exception { - ProtonMessageConverter converter = (ProtonMessageConverter) manager.getConverter(); - - // The Proton variant accepts a WritableBuffer to allow for a faster more direct encode. - return (long) converter.outbound(message, deliveryCount, buffer); - } - public String tempQueueName() { return UUIDGenerator.getInstance().generateStringUUID(); } @@ -350,7 +342,7 @@ public class AMQPSessionCallback implements SessionCallback { String address, int messageFormat, byte[] data) throws Exception { - AMQPMessage message = new AMQPMessage(messageFormat, data, manager); + AMQPMessage message = new AMQPMessage(messageFormat, data); if (address != null) { message.setAddress(new SimpleString(address)); } else { @@ -494,7 +486,7 @@ public class AMQPSessionCallback implements SessionCallback { ProtonServerSenderContext plugSender = (ProtonServerSenderContext) consumer.getProtocolContext(); try { - return plugSender.deliverMessage(message, deliveryCount); + return plugSender.deliverMessage(CoreAmqpConverter.checkAMQP(message), deliveryCount); } catch (Exception e) { synchronized (connection.getLock()) { plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java index 754172a..9c7d24d 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java @@ -26,19 +26,17 @@ import io.netty.channel.ChannelPipeline; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.activemq.artemis.protocol.amqp.converter.ProtonMessageConverter; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -54,8 +52,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti private final ActiveMQServer server; - private MessageConverter protonConverter; - private final ProtonProtocolManagerFactory factory; private final Map<SimpleString, RoutingType> prefixes = new HashMap<>(); @@ -72,7 +68,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti public ProtonProtocolManager(ProtonProtocolManagerFactory factory, ActiveMQServer server) { this.factory = factory; this.server = server; - this.protonConverter = new ProtonMessageConverter(server.getStorageManager()); } public ActiveMQServer getServer() { @@ -80,11 +75,6 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti } @Override - public MessageConverter getConverter() { - return protonConverter; - } - - @Override public void onNotification(Notification notification) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b557f2df/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java new file mode 100644 index 0000000..724474b --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPConverter.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.protocol.amqp.converter; + +import org.apache.activemq.artemis.api.core.ICoreMessage; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; +import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; + + +public class AMQPConverter implements MessageConverter<AMQPMessage> { + + private static final AMQPConverter theInstance = new AMQPConverter(); + + private AMQPConverter() { + } + + public static AMQPConverter getInstance() { + return theInstance; + } + + @Override + public AMQPMessage fromCore(ICoreMessage coreMessage) throws Exception { + return CoreAmqpConverter.fromCore(coreMessage); + } + + @Override + public ICoreMessage toCore(AMQPMessage messageSource) throws Exception { + return AmqpCoreConverter.toCore(messageSource); + } +}