QPID-7434: [Java Broker] Improve AMQP 0-8 to 1.0 content conversion and unit tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/20c9c58c Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/20c9c58c Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/20c9c58c Branch: refs/heads/master Commit: 20c9c58c9469abd6398f16644c5b0e5c751a06dc Parents: 67aa48f Author: Alex Rudyy <oru...@apache.org> Authored: Tue Aug 8 13:10:23 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Wed Aug 9 14:30:59 2017 +0100 ---------------------------------------------------------------------- .../mimecontentconverter/IdentityConverter.java | 62 ++++ .../MimeContentConverterRegistry.java | 3 +- .../v1_0/MessageConverter_from_1_0.java | 14 +- .../protocol/v1_0/MessageConverter_to_1_0.java | 226 ++++++++++++- .../MessageConverter_1_0_to_v0_10Test.java | 159 ++++++++- .../v0_8_v1_0/MessageConverter_0_8_to_1_0.java | 21 +- .../MessageConverter_0_8_to_1_0Test.java | 330 ++++++++++++++++--- .../MessageConverter_1_0_to_v0_8Test.java | 156 ++++++++- 8 files changed, 880 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/IdentityConverter.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/IdentityConverter.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/IdentityConverter.java new file mode 100644 index 0000000..3b82582 --- /dev/null +++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/IdentityConverter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message.mimecontentconverter; + +import org.apache.qpid.server.plugin.PluggableService; + +@PluggableService +public class IdentityConverter implements ObjectToMimeContentConverter<Object> +{ + @Override + public String getType() + { + return getMimeType(); + } + + @Override + public String getMimeType() + { + return null; + } + + @Override + public Class<Object> getObjectClass() + { + return Object.class; + } + + @Override + public int getRank() + { + return Integer.MIN_VALUE; + } + + @Override + public boolean isAcceptable(final Object object) + { + return object == null; + } + + @Override + public byte[] toMimeContent(final Object object) + { + return new byte[0]; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java ---------------------------------------------------------------------- diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java index b580557..79cbba7 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java +++ b/broker-core/src/main/java/org/apache/qpid/server/message/mimecontentconverter/MimeContentConverterRegistry.java @@ -72,7 +72,7 @@ public class MimeContentConverterRegistry } classToMineConverters.put(objectClass, converter); } - classToMineConverters.put(Void.class, new StringToTextPlain()); + classToMineConverters.put(Void.class, new IdentityConverter()); return ImmutableMultimap.copyOf(classToMineConverters); } @@ -137,6 +137,7 @@ public class MimeContentConverterRegistry } } } + return converter; } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java index 71add9e..62230d8 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java @@ -76,11 +76,12 @@ public class MessageConverter_from_1_0 byte[].class, UUID.class)); - private static final Pattern TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$"); - private static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$"); - private static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$"); - private static final Pattern + public static final Pattern TEXT_CONTENT_TYPES = Pattern.compile("^(text/.*)|(application/(xml|xml-dtd|.*\\+xml|json|.*\\+json|javascript|ecmascript))$"); + public static final Pattern MAP_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/map|jms/map-message$"); + public static final Pattern LIST_MESSAGE_CONTENT_TYPES = Pattern.compile("^amqp/list|jms/stream-message$"); + public static final Pattern OBJECT_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/x-java-serialized-object|application/java-object-stream$"); + public static final Pattern BYTES_MESSAGE_CONTENT_TYPES = Pattern.compile("^application/octet-stream$"); static Object convertBodyToObject(final Message_1_0 serverMessage) { @@ -320,6 +321,11 @@ public class MessageConverter_from_1_0 // the AMQP 0-x client does not accept the "application/x-java-serialized-object" mimeTypes so use fall back supportedContentType = "application/java-object-stream"; } + else if (BYTES_MESSAGE_CONTENT_TYPES.matcher(type).matches()) + { + contentTypeClassHint = byte[].class; + supportedContentType = "application/octet-stream"; + } if (classHint == null || classHint == contentTypeClassHint) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java index 53c6666..09b9964 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -20,9 +20,22 @@ */ package org.apache.qpid.server.protocol.v1_0; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.BYTES_MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MAP_MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.OBJECT_MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.STREAM_MESSAGE; +import static org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation.TEXT_MESSAGE; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.ListIterator; @@ -30,8 +43,8 @@ import java.util.Map; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter; import org.apache.qpid.server.message.mimecontentconverter.MimeContentConverterRegistry; +import org.apache.qpid.server.message.mimecontentconverter.MimeContentToObjectConverter; import org.apache.qpid.server.model.NamedAddressSpace; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder; @@ -39,20 +52,163 @@ import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl; import org.apache.qpid.server.protocol.v1_0.type.Binary; import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations; import org.apache.qpid.server.protocol.v1_0.type.messaging.NonEncodingRetainingSection; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.GZIPUtils; public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0> { + private static final byte[] SERIALIZED_NULL = getObjectBytes(null); private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance() - .registerTransportLayer() - .registerMessagingLayer() - .registerTransactionLayer() - .registerSecurityLayer(); + .registerTransportLayer() + .registerMessagingLayer() + .registerTransactionLayer() + .registerSecurityLayer(); + + public static Symbol getContentType(final String contentMimeType, final EncodingRetainingSection<?> bodySection) + { + Symbol contentType = null; + if (contentMimeType != null) + { + if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + contentType = Symbol.valueOf(contentMimeType); + } + else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + contentType = Symbol.valueOf("application/octet-stream"); + } + else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + contentType = null; + } + else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + contentType = null; + } + else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + contentType = Symbol.valueOf("application/x-java-serialized-object"); + } + else + { + contentType = Symbol.valueOf(contentMimeType); + } + } + return contentType; + } + + public static MessageAnnotations createMessageAnnotation(final EncodingRetainingSection<?> bodySection, + final String contentMimeType) + { + MessageAnnotations messageAnnotations = null; + final Symbol key = Symbol.valueOf("x-opt-jms-msg-type"); + if (contentMimeType != null) + { + if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, TEXT_MESSAGE.getType())); + } + else if (MessageConverter_from_1_0.BYTES_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, BYTES_MESSAGE.getType())); + } + else if (MessageConverter_from_1_0.MAP_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + if (isSectionValidForJmsMap(bodySection)) + { + messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, MAP_MESSAGE.getType())); + } + } + else if (MessageConverter_from_1_0.LIST_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + if (isSectionValidForJmsList(bodySection)) + { + messageAnnotations = + new MessageAnnotations(Collections.singletonMap(key, STREAM_MESSAGE.getType())); + } + } + else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(contentMimeType).matches()) + { + messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, OBJECT_MESSAGE.getType())); + } + } + else if (bodySection instanceof AmqpValueSection && bodySection.getValue() == null) + { + messageAnnotations = new MessageAnnotations(Collections.singletonMap(key, MESSAGE.getType())); + } + return messageAnnotations; + } + + private static boolean isSectionValidForJmsList(final EncodingRetainingSection<?> section) + { + if (section instanceof AmqpSequenceSection) + { + final List<?> list = ((AmqpSequenceSection) section).getValue(); + for (Object entry: list) + { + if (!(entry == null + || entry instanceof Boolean + || entry instanceof Byte + || entry instanceof Short + || entry instanceof Integer + || entry instanceof Long + || entry instanceof Float + || entry instanceof Double + || entry instanceof Character + || entry instanceof String + || entry instanceof Binary)) + { + return false; + } + } + return true; + } + return false; + } + + private static boolean isSectionValidForJmsMap(final EncodingRetainingSection<?> section) + { + if (section instanceof AmqpValueSection) + { + final Object valueObject = ((AmqpValueSection) section).getValue(); + if (valueObject instanceof Map) + { + final Map<?, ?> map = (Map) valueObject; + for (Map.Entry<?,?> entry: map.entrySet()) + { + if (!(entry.getKey() instanceof String)) + { + return false; + } + Object value = entry.getValue(); + if (!(value == null + || value instanceof Boolean + || value instanceof Byte + || value instanceof Short + || value instanceof Integer + || value instanceof Long + || value instanceof Float + || value instanceof Double + || value instanceof Character + || value instanceof String + || value instanceof Binary)) + { + return false; + } + } + return true; + } + } + return false; + } @Override public final Class<Message_1_0> getOutputClass() @@ -92,25 +248,45 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement private static NonEncodingRetainingSection<?> convertMessageBody(String mimeType, byte[] data) { - - MimeContentToObjectConverter converter = MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType); - if (converter != null) + if (data != null && data.length != 0) { - Object bodyObject = converter.toObject(data); - if (bodyObject instanceof String) + MimeContentToObjectConverter converter = + MimeContentConverterRegistry.getMimeContentToObjectConverter(mimeType); + if (converter != null) { - return new AmqpValue(bodyObject); - } - else if (bodyObject instanceof Map) - { - return new AmqpValue(fixMapValues((Map<String, Object>) bodyObject)); + Object bodyObject = converter.toObject(data); + + if (bodyObject instanceof String) + { + return new AmqpValue(bodyObject); + } + else if (bodyObject instanceof Map) + { + return new AmqpValue(fixMapValues((Map<String, Object>) bodyObject)); + } + else if (bodyObject instanceof List) + { + return new AmqpSequence(fixListValues((List<Object>) bodyObject)); + } } - else if (bodyObject instanceof List) + else if (mimeType != null && MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches()) { - return new AmqpValue(fixListValues((List<Object>) bodyObject)); + return new AmqpValue(new String(data, UTF_8)); } } + else if (mimeType == null) + { + return new AmqpValue(null); + } + else if (MessageConverter_from_1_0.OBJECT_MESSAGE_CONTENT_TYPES.matcher(mimeType).matches()) + { + return new Data(new Binary(SERIALIZED_NULL)); + } + else if (MessageConverter_from_1_0.TEXT_CONTENT_TYPES.matcher(mimeType).matches()) + { + return new AmqpValue(null); + } return new Data(new Binary(data)); } @@ -188,6 +364,22 @@ public abstract class MessageConverter_to_1_0<M extends ServerMessage> implement return convertMessageBody(mimeType, data).createEncodingRetainingSection(); } + private static byte[] getObjectBytes(final Object object) + { + final byte[] expected; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) + { + oos.writeObject(object); + expected = baos.toByteArray(); + } + catch (IOException e) + { + throw new IllegalStateException(e); + } + return expected; + } + private static class ConvertedMessage<M extends ServerMessage> implements StoredMessage<MessageMetaData_1_0> { private final MessageMetaData_1_0 _metaData; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java index 9e59fe6..3e83f73 100644 --- a/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java +++ b/broker-plugins/amqp-msg-conv-0-10-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_1_0_to_v0_10Test.java @@ -93,11 +93,12 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase _converter = new MessageConverter_1_0_to_v0_10(); } - public void testAmqpValueWithNull() throws Exception + public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception { final Object expected = null; final AmqpValue amqpValue = new AmqpValue(expected); - Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection()); + Message_1_0 sourceMessage = + createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); @@ -105,16 +106,156 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase assertEquals("Unexpected content size", 0, convertedMessage.getSize()); } - public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception + public void testAmqpValueWithNullWithMessageAnnotation() throws Exception { final Object expected = null; final AmqpValue amqpValue = new AmqpValue(expected); Message_1_0 sourceMessage = - createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + createTestMessage(MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getSize()); + } + + public void testAmqpValueWithNullWithObjectMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(OBJECT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + + assertEquals("Unexpected mime type", + "application/java-object-stream", + convertedMessage.getMessageHeader().getMimeType()); + assertArrayEquals("Unexpected content size", getObjectBytes(null), getBytes(content)); + } + + public void testAmqpValueWithNullWithMapMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(MAP_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + + assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType()); + assertArrayEquals("Unexpected content size", + new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()), + getBytes(content)); + } + + public void testAmqpValueWithNullWithBytesMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(BYTE_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", + "application/octet-stream", + convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getSize()); + } + + public void testAmqpValueWithNullWithStreamMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(STREAM_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getSize()); + } + + public void testAmqpValueWithNullWithUnknownMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = + createTestMessage(new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), + (byte) 11)), + amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getSize()); + } + + public void testAmqpValueWithNullWithContentTypeApplicationOctetStream() throws Exception + { + Properties properties = new Properties(); + properties.setContentType(Symbol.valueOf("application/octet-stream")); + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", "application/octet-stream", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getSize()); + } + + public void testAmqpValueWithNullWithObjectMessageContentType() throws Exception + { + final Properties properties = new Properties(); + properties.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", + "application/java-object-stream", + convertedMessage.getMessageHeader().getMimeType()); + + assertEquals("Unexpected content size", + getObjectBytes(null).length, + convertedMessage.getSize()); + } + + public void testAmqpValueWithNullWithJmsMapContentType() throws Exception + { + final Properties properties = new Properties(); + properties.setContentType(Symbol.valueOf("jms/map-message")); + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + + assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType()); + + assertArrayEquals("Unexpected content size", + new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()), + getBytes(content)); + } + + + + + public void testAmqpValueWithNull() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection()); + + final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getSize()); } @@ -630,7 +771,7 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getSize()); } @@ -702,7 +843,7 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getSize()); } @@ -712,7 +853,7 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getSize()); } @@ -724,7 +865,7 @@ public class MessageConverter_1_0_to_v0_10Test extends QpidTestCase final MessageTransferMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", "application/octet-stream", convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getSize()); } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java index 01597f2..ba3526c 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java @@ -25,12 +25,12 @@ import java.util.Date; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.protocol.converter.MessageConversionException; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.AMQShortString; -import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.FieldTable; -import org.apache.qpid.server.plugin.PluggableService; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0; import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; @@ -43,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations; import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; import org.apache.qpid.server.url.AMQBindingURL; import org.apache.qpid.server.util.GZIPUtils; @@ -81,13 +82,8 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString())); } - props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString())); - - // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client - if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) - { - props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); - } + Symbol contentType = getContentType(contentHeader.getContentTypeAsString(), bodySection); + props.setContentType(contentType); final AMQShortString correlationId = contentHeader.getCorrelationId(); if(correlationId != null) @@ -207,9 +203,12 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMess throw new MessageConversionException("Could not convert message from 0-8 to 1.0 because headers conversion failed.", e); } + MessageAnnotations messageAnnotations = createMessageAnnotation(bodySection, + contentHeader.getContentTypeAsString()); + return new MessageMetaData_1_0(header.createEncodingRetainingSection(), null, - null, + messageAnnotations == null ? null : messageAnnotations.createEncodingRetainingSection(), props.createEncodingRetainingSection(), applicationProperties.createEncodingRetainingSection(), null, http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java index 3ef2e7d..62dc8f4d 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0Test.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.protocol.converter.v0_8_v1_0; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.qpid.server.protocol.v1_0.MessageConverter_from_1_0.getContentType; import static org.junit.Assert.assertArrayEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -43,17 +45,25 @@ import org.mockito.stubbing.Answer; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.model.NamedAddressSpace; +import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.ListToAmqpListConverter; +import org.apache.qpid.server.protocol.v0_10.transport.mimecontentconverter.MapToAmqpMapConverter; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.protocol.v0_8.MessageMetaData; import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties; import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody; import org.apache.qpid.server.protocol.v0_8.transport.MessagePublishInfo; +import org.apache.qpid.server.protocol.v1_0.JmsMessageTypeAnnotation; import org.apache.qpid.server.protocol.v1_0.Message_1_0; import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequenceSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.DataSection; import org.apache.qpid.server.protocol.v1_0.type.messaging.EncodingRetainingSection; +import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotationsSection; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.typedmessage.TypedBytesContentWriter; import org.apache.qpid.test.utils.QpidTestCase; @@ -87,78 +97,195 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase public void testConvertStringMessageBody() throws Exception { - final String expected = "helloworld"; + doTestTextMessage("helloworld", "text/plain"); + } - final AMQMessage sourceMessage = getAmqMessage(expected.getBytes(), "text/plain"); + public void testConvertEmptyStringMessageBody() throws Exception + { + doTestTextMessage(null, "text/plain"); + } - final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + public void testConvertStringXmlMessageBody() throws Exception + { + doTestTextMessage("<helloworld></helloworld>", "text/xml"); + } - final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + public void testConvertEmptyStringXmlMessageBody() throws Exception + { + doTestTextMessage(null, "text/xml"); + } - List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1); - assertEquals(expected, sections.get(0).getValue()); + public void testConvertEmptyStringApplicationXmlMessageBody() throws Exception + { + doTestTextMessage(null, "application/xml"); + } + + public void testConvertStringWithContentTypeText() throws Exception + { + doTestTextMessage("foo","text/foobar"); + } + + public void testConvertStringWithContentTypeApplicationXml() throws Exception + { + doTestTextMessage("<helloworld></helloworld>","application/xml"); + } + + public void testConvertStringWithContentTypeApplicationXmlDtd() throws Exception + { + doTestTextMessage("<!DOCTYPE name []>","application/xml-dtd"); + } + + public void testConvertStringWithContentTypeApplicationFooXml() throws Exception + { + doTestTextMessage("<helloworld></helloworld>","application/foo+xml"); + } + + public void testConvertStringWithContentTypeApplicationJson() throws Exception + { + doTestTextMessage("[]","application/json"); + } + + public void testConvertStringWithContentTypeApplicationFooJson() throws Exception + { + doTestTextMessage("[]","application/foo+json"); + } + + public void testConvertStringWithContentTypeApplicationJavascript() throws Exception + { + doTestTextMessage("var foo","application/javascript"); + } + + public void testConvertStringWithContentTypeApplicationEcmascript() throws Exception + { + doTestTextMessage("var foo","application/ecmascript"); } public void testConvertBytesMessageBody() throws Exception { - final byte[] expected = "helloworld".getBytes(); + doTestBytesMessage("helloworld".getBytes(), "application/octet-stream"); + } - final AMQMessage sourceMessage = getAmqMessage(expected, "application/octet-stream"); + public void testConvertBytesMessageBodyNoContentType() throws Exception + { + final byte[] messageContent = "helloworld".getBytes(); + doTest(messageContent, + null, + DataSection.class, + messageContent, + null, + null); + } - final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + public void testConvertBytesMessageBodyUnknownContentType() throws Exception + { + final byte[] messageContent = "helloworld".getBytes(); + doTest(messageContent, + "my/bytes", + DataSection.class, + messageContent, + Symbol.valueOf("my/bytes"), + null); + } - final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); - List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1); - final Binary value = (Binary) sections.get(0).getValue(); - assertArrayEquals(expected, value.getArray()); + public void testConvertEmptyBytesMessageBody() throws Exception + { + doTestBytesMessage(new byte[0], "application/octet-stream"); } - public void testConvertListMessageBody() throws Exception + public void testConvertJmsStreamMessageBody() throws Exception { - final List<Object> expected = Lists.<Object>newArrayList("apple", 43, 31.42D); + final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D); final byte[] messageBytes = getJmsStreamMessageBytes(expected); - final AMQMessage sourceMessage = getAmqMessage(messageBytes, "jms/stream-message"); + final String mimeType = "jms/stream-message"; + doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType()); + } - final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + public void testConvertAmqpListMessageBody() throws Exception + { + final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D); + final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected); - final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + final String mimeType = "amqp/list"; + doTestStreamMessage(messageBytes, mimeType, expected, JmsMessageTypeAnnotation.STREAM_MESSAGE.getType()); + } - List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1); - assertEquals(expected, sections.get(0).getValue()); + public void testConvertAmqpListMessageBodyWithNonJmsContent() throws Exception + { + final List<Object> expected = Lists.newArrayList("apple", 43, 31.42D, Lists.newArrayList("nonJMSList")); + final byte[] messageBytes = new ListToAmqpListConverter().toMimeContent(expected); + + final String mimeType = "amqp/list"; + doTestStreamMessage(messageBytes, mimeType, expected, null); } - public void testConvertMapMessageBody() throws Exception + public void testConvertJmsMapMessageBody() throws Exception { - final Map<String, Object> expected = Collections.<String, Object>singletonMap("key", "value"); + final Map<String, Object> expected = Collections.singletonMap("key", "value"); final byte[] messageBytes = getJmsMapMessageBytes(expected); - final AMQMessage sourceMessage = getAmqMessage(messageBytes, "jms/map-message"); + doTestMapMessage(messageBytes, "jms/map-message", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType()); + } - final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + public void testConvertAmqpMapMessageBody() throws Exception + { + final Map<String, Object> expected = Collections.singletonMap("key", "value"); + final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected); - final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + doTestMapMessage(messageBytes, "amqp/map", expected, JmsMessageTypeAnnotation.MAP_MESSAGE.getType()); + } - List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1); - assertEquals(expected, sections.get(0).getValue()); + public void testConvertAmqpMapMessageBodyWithNonJmsContent() throws Exception + { + final Map<String, Object> expected = Collections.singletonMap("key", Collections.singletonList("nonJmsList")); + final byte[] messageBytes = new MapToAmqpMapConverter().toMimeContent(expected); + + doTestMapMessage(messageBytes, "amqp/map", expected, null); } public void testConvertObjectStreamMessageBody() throws Exception { final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID()); + final byte[] expectedBytes = messageBytes; - final AMQMessage sourceMessage = getAmqMessage(messageBytes, "application/java-object-stream"); + doTestObjectMessage(messageBytes, "application/java-object-stream", expectedBytes); + } - final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + public void testConvertObjectStream2MessageBody() throws Exception + { + final byte[] messageBytes = getObjectStreamMessageBytes(UUID.randomUUID()); + final byte[] expectedBytes = messageBytes; - final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + doTestObjectMessage(messageBytes, "application/x-java-serialized-object", expectedBytes); + } - List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1); - final Binary value = (Binary) sections.get(0).getValue(); - assertArrayEquals(messageBytes, value.getArray()); + public void testConvertEmptyObjectStreamMessageBody() throws Exception + { + final byte[] messageBytes = null; + final byte[] expectedBytes = getObjectStreamMessageBytes(messageBytes); + final String mimeType = "application/java-object-stream"; + + doTestObjectMessage(messageBytes, mimeType, expectedBytes); + } + + public void testConvertEmptyMessageWithoutContentType() throws Exception + { + doTest(null, null, AmqpValueSection.class, null, null, JmsMessageTypeAnnotation.MESSAGE.getType()); } + public void testConvertEmptyMessageWithUnknownContentType() throws Exception + { + doTest(null, "foo/bar", DataSection.class, new byte[0], Symbol.valueOf("foo/bar"), null); + } + + public void testConvertMessageWithoutContentType() throws Exception + { + final byte[] expectedContent = "someContent".getBytes(UTF_8); + doTest(expectedContent, null, DataSection.class, expectedContent, null, null); + } + + private byte[] getObjectStreamMessageBytes(final Serializable o) throws Exception { try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -172,18 +299,18 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase private byte[] getJmsStreamMessageBytes(List<Object> objects) throws Exception { TypedBytesContentWriter writer = new TypedBytesContentWriter(); - for(Object o : objects) + for (Object o : objects) { writer.writeObject(o); } return getBytes(writer); } - private byte[] getJmsMapMessageBytes(Map<String,Object> map) throws Exception + private byte[] getJmsMapMessageBytes(Map<String, Object> map) throws Exception { TypedBytesContentWriter writer = new TypedBytesContentWriter(); writer.writeIntImpl(map.size()); - for(Map.Entry<String, Object> entry : map.entrySet()) + for (Map.Entry<String, Object> entry : map.entrySet()) { writer.writeNullTerminatedStringImpl(entry.getKey()); writer.writeObject(entry.getValue()); @@ -207,7 +334,6 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase final List<EncodingRetainingSection<?>> sections = sectionDecoder.parseAll(new ArrayList<>(content)); assertEquals("Unexpected number of sections", expectedNumberOfSections, sections.size()); return sections; - } protected AMQMessage getAmqMessage(final byte[] expected, final String mimeType) @@ -221,16 +347,22 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase private void configureMessageHeader(final String mimeType) { when(_header.getMimeType()).thenReturn(mimeType); + when(_basicContentHeaderProperties.getContentTypeAsString()).thenReturn(mimeType); } - private void configureMessageContent(final byte[] section) + private void configureMessageContent(byte[] section) { + if (section == null) + { + section = new byte[0]; + } final QpidByteBuffer combined = QpidByteBuffer.wrap(section); - when(_handle.getContentSize()).thenReturn((int) section.length); + when(_handle.getContentSize()).thenReturn(section.length); final ArgumentCaptor<Integer> offsetCaptor = ArgumentCaptor.forClass(Integer.class); final ArgumentCaptor<Integer> sizeCaptor = ArgumentCaptor.forClass(Integer.class); - when(_handle.getContent(offsetCaptor.capture(), sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>() + when(_handle.getContent(offsetCaptor.capture(), + sizeCaptor.capture())).then(new Answer<Collection<QpidByteBuffer>>() { @Override public Collection<QpidByteBuffer> answer(final InvocationOnMock invocation) throws Throwable @@ -241,4 +373,122 @@ public class MessageConverter_0_8_to_1_0Test extends QpidTestCase }); } + private Byte getJmsMessageTypeAnnotation(final Message_1_0 convertedMessage) + { + MessageAnnotationsSection messageAnnotationsSection = convertedMessage.getMessageAnnotationsSection(); + if (messageAnnotationsSection != null) + { + Map<Symbol, Object> messageAnnotations = messageAnnotationsSection.getValue(); + if (messageAnnotations != null) + { + Object annotation = messageAnnotations.get(Symbol.valueOf("x-opt-jms-msg-type")); + if (annotation instanceof Byte) + { + return ((Byte) annotation); + } + } + } + return null; + } + + private void doTestTextMessage(final String originalContent, final String mimeType) throws Exception + { + final byte[] contentBytes = originalContent == null ? null : originalContent.getBytes(UTF_8); + String expectedContent = originalContent == null ? null : originalContent; + doTest(contentBytes, + mimeType, + AmqpValueSection.class, + expectedContent, + Symbol.valueOf(mimeType), + JmsMessageTypeAnnotation.TEXT_MESSAGE.getType()); + } + + + private void doTestMapMessage(final byte[] messageBytes, + final String mimeType, + final Map<String, Object> expected, + final Byte expectedJmsTypeAnnotation) throws Exception + { + doTest(messageBytes, mimeType, AmqpValueSection.class, expected, null, expectedJmsTypeAnnotation); + } + + private void doTestBytesMessage(final byte[] messageContent, final String mimeType) throws Exception + { + doTest(messageContent, + mimeType, + DataSection.class, + messageContent, + Symbol.valueOf(mimeType), + JmsMessageTypeAnnotation.BYTES_MESSAGE.getType()); + } + + private void doTestStreamMessage(final byte[] messageBytes, + final String mimeType, + final List<Object> expected, + final Byte expectedJmsTypAnnotation) throws Exception + { + doTest(messageBytes, mimeType, AmqpSequenceSection.class, expected, null, expectedJmsTypAnnotation); + } + + private void doTestObjectMessage(final byte[] messageBytes, + final String mimeType, + final byte[] expectedBytes) + throws Exception + { + doTest(messageBytes, + mimeType, + DataSection.class, + expectedBytes, + Symbol.valueOf("application/x-java-serialized-object"), + JmsMessageTypeAnnotation.OBJECT_MESSAGE.getType()); + } + + private void doTest(final byte[] messageBytes, + final String mimeType, + final Class<? extends EncodingRetainingSection<?>> expectedBodySection, + final Object expectedContent, + final Symbol expectedContentType, + final Byte expectedJmsTypeAnnotation) throws Exception + { + final AMQMessage sourceMessage = getAmqMessage(messageBytes, mimeType); + final Message_1_0 convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + + List<EncodingRetainingSection<?>> sections = getEncodingRetainingSections(content, 1); + EncodingRetainingSection<?> encodingRetainingSection = sections.get(0); + assertEquals("Unexpected section type", expectedBodySection, encodingRetainingSection.getClass()); + + if (expectedContent instanceof byte[]) + { + assertArrayEquals("Unexpected content", + ((byte[]) expectedContent), + ((Binary) encodingRetainingSection.getValue()).getArray()); + } + else + { + assertEquals("Unexpected content", expectedContent, encodingRetainingSection.getValue()); + } + + Symbol contentType = getContentType(convertedMessage); + if (expectedContentType == null) + { + assertNull("Content type should be null", contentType); + } + else + { + assertEquals("Unexpected content type", expectedContentType, contentType); + } + + Byte jmsMessageTypeAnnotation = getJmsMessageTypeAnnotation(convertedMessage); + if (expectedJmsTypeAnnotation == null) + { + assertNull("Unexpected annotation 'x-opt-jms-msg-type'", jmsMessageTypeAnnotation); + } + else + { + assertEquals("Unexpected annotation 'x-opt-jms-msg-type'", + expectedJmsTypeAnnotation, + jmsMessageTypeAnnotation); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/20c9c58c/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java index d272c87..c9d17cc 100644 --- a/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java +++ b/broker-plugins/amqp-msg-conv-0-8-to-1-0/src/test/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_1_0_to_v0_8Test.java @@ -93,11 +93,12 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase _converter = new MessageConverter_1_0_to_v0_8(); } - public void testAmqpValueWithNull() throws Exception + public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception { final Object expected = null; final AmqpValue amqpValue = new AmqpValue(expected); - Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection()); + Message_1_0 sourceMessage = + createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); @@ -105,16 +106,153 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); } - public void testAmqpValueWithNullWithTextMessageAnnotation() throws Exception + public void testAmqpValueWithNullWithMessageAnnotation() throws Exception { final Object expected = null; final AmqpValue amqpValue = new AmqpValue(expected); Message_1_0 sourceMessage = - createTestMessage(TEXT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + createTestMessage(MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); + } + + public void testAmqpValueWithNullWithObjectMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(OBJECT_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + + assertEquals("Unexpected mime type", + "application/java-object-stream", + convertedMessage.getMessageHeader().getMimeType()); + assertArrayEquals("Unexpected content size", getObjectBytes(null), getBytes(content)); + } + + public void testAmqpValueWithNullWithMapMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(MAP_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + + assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType()); + assertArrayEquals("Unexpected content size", + new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()), + getBytes(content)); + } + + public void testAmqpValueWithNullWithBytesMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(BYTE_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", + "application/octet-stream", + convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); + } + + public void testAmqpValueWithNullWithStreamMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(STREAM_MESSAGE_MESSAGE_ANNOTATION, amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", "jms/stream-message", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); + } + + public void testAmqpValueWithNullWithUnknownMessageAnnotation() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = + createTestMessage(new MessageAnnotations(Collections.singletonMap(Symbol.valueOf("x-opt-jms-msg-type"), + (byte) 11)), + amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); + } + + public void testAmqpValueWithNullWithContentTypeApplicationOctetStream() throws Exception + { + Properties properties = new Properties(); + properties.setContentType(Symbol.valueOf("application/octet-stream")); + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", "application/octet-stream", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); + } + + public void testAmqpValueWithNullWithObjectMessageContentType() throws Exception + { + final Properties properties = new Properties(); + properties.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", + "application/java-object-stream", + convertedMessage.getMessageHeader().getMimeType()); + + assertEquals("Unexpected content size", + getObjectBytes(null).length, + convertedMessage.getMessageMetaData().getContentSize()); + } + + public void testAmqpValueWithNullWithJmsMapContentType() throws Exception + { + final Properties properties = new Properties(); + properties.setContentType(Symbol.valueOf("jms/map-message")); + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(properties, amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + final Collection<QpidByteBuffer> content = convertedMessage.getContent(0, (int) convertedMessage.getSize()); + + assertEquals("Unexpected mime type", "jms/map-message", convertedMessage.getMessageHeader().getMimeType()); + + assertArrayEquals("Unexpected content size", + new MapToJmsMapMessage().toMimeContent(Collections.emptyMap()), + getBytes(content)); + } + + public void testAmqpValueWithNull() throws Exception + { + final Object expected = null; + final AmqpValue amqpValue = new AmqpValue(expected); + Message_1_0 sourceMessage = createTestMessage(amqpValue.createEncodingRetainingSection()); + + final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); + + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); } @@ -630,7 +768,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); } @@ -702,7 +840,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); } @@ -712,7 +850,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", null, convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); } @@ -724,7 +862,7 @@ public class MessageConverter_1_0_to_v0_8Test extends QpidTestCase final AMQMessage convertedMessage = _converter.convert(sourceMessage, mock(NamedAddressSpace.class)); - assertEquals("Unexpected mime type", "text/plain", convertedMessage.getMessageHeader().getMimeType()); + assertEquals("Unexpected mime type", "application/octet-stream", convertedMessage.getMessageHeader().getMimeType()); assertEquals("Unexpected content size", 0, convertedMessage.getMessageMetaData().getContentSize()); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org