Fixed some test failures
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/610174b4 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/610174b4 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/610174b4 Branch: refs/heads/refactor-openwire Commit: 610174b42b84d1764a35ebe1ccec13afeab8e23d Parents: bdb27a9 Author: Howard Gao <howard....@gmail.com> Authored: Thu Feb 18 20:52:32 2016 +0800 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Thu Feb 25 18:10:23 2016 -0500 ---------------------------------------------------------------------- .../openwire/OpenWireMessageConverter.java | 7 +++++++ .../core/protocol/openwire/amq/AMQSession.java | 1 + .../artemiswrapper/ArtemisBrokerWrapper.java | 14 ++++++++++++++ .../org/apache/activemq/broker/BrokerTest.java | 19 +++++++++++++++---- 4 files changed, 37 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/610174b4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index f61705e..d040955 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -391,6 +391,13 @@ public class OpenWireMessageConverter implements MessageConverter { coreMessage.putStringProperty(AMQ_MSG_USER_ID, userId); } coreMessage.putBooleanProperty(AMQ_MSG_DROPPABLE, messageSend.isDroppable()); + + ActiveMQDestination origDest = messageSend.getOriginalDestination(); + if (origDest != null) { + ByteSequence origDestBytes = marshaller.marshal(origDest); + origDestBytes.compact(); + coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data); + } } private static void loadMapIntoProperties(TypedProperties props, Map<String, Object> map) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/610174b4/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 701c9ce..fe8d3c4 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -255,6 +255,7 @@ public class AMQSession implements SessionCallback { ActiveMQDestination[] actualDestinations = null; if (destination.isComposite()) { actualDestinations = destination.getCompositeDestinations(); + messageSend.setOriginalDestination(destination); } else { actualDestinations = new ActiveMQDestination[]{destination}; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/610174b4/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java index 3ad6072..5cb5048 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java @@ -29,9 +29,12 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration; +import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.registry.JndiBindingRegistry; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; @@ -257,4 +260,15 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase { } } } + + public long getAMQueueMessageCount(String physicalName) { + long count = 0; + String qname = "jms.queue." + physicalName; + Binding binding = server.getPostOffice().getBinding(new SimpleString(qname)); + if (binding != null) { + QueueImpl q = (QueueImpl) binding.getBindable(); + count = q.getMessageCount(); + } + return count; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/610174b4/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java index 1e83319..9f412a9 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/BrokerTest.java @@ -24,6 +24,7 @@ import javax.jms.DeliveryMode; import junit.framework.Test; +import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -454,9 +455,13 @@ public class BrokerTest extends BrokerTestSupport { // Commit the transaction. connection1.send(createCommitTransaction1Phase(connectionInfo1, txid)); + //due to async tx operations, we need some time for message count to go down + Thread.sleep(1000); + ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); + long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); // The queue should now only have the remaining 2 messages - assertEquals(2, countMessagesInQueue(connection1, connectionInfo1, destination)); + assertEquals(2, messageCount); } public void initCombosForTestConsumerCloseCausesRedelivery() { @@ -1463,11 +1468,17 @@ public class BrokerTest extends BrokerTestSupport { assertNotNull(m); assertEquals(m.getMessageId(), message1.getMessageId()); - assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 2); + ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker(); + long messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); + assertTrue(messageCount == 2); connection.send(createAck(consumerInfo, m, 1, MessageAck.DELIVERED_ACK_TYPE)); - assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 2); + messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); + assertTrue(messageCount == 2); connection.send(createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE)); - assertTrue(countMessagesInQueue(connection, connectionInfo, destination) == 1); + //give some time for broker to count down + Thread.sleep(2000); + messageCount = wrapper.getAMQueueMessageCount(destination.getPhysicalName()); + assertTrue(messageCount == 1); }