Repository: activemq Updated Branches: refs/heads/trunk c5f183548 -> 74f530a64
https://issues.apache.org/jira/browse/AMQ-5457 - fix and test - we now peek first in redeliveredWaitingDispatch Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/74f530a6 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/74f530a6 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/74f530a6 Branch: refs/heads/trunk Commit: 74f530a6410522df76e842a3f2498a3442db5281 Parents: c5f1835 Author: gtully <gary.tu...@gmail.com> Authored: Tue Nov 25 14:21:57 2014 +0000 Committer: gtully <gary.tu...@gmail.com> Committed: Tue Nov 25 14:23:33 2014 +0000 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 1 + .../apache/activemq/broker/jmx/MBeanTest.java | 57 ++++++++++++++++++++ 2 files changed, 58 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/74f530a6/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 5c7a988..43f02f8 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1121,6 +1121,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pageInMessages(!memoryUsage.isFull(110)); }; + doBrowseList(browseList, max, redeliveredWaitingDispatch, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch"); doBrowseList(browseList, max, pagedInPendingDispatch, pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch"); doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages"); http://git-wip-us.apache.org/repos/asf/activemq/blob/74f530a6/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 55b283f..8e94b48 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -44,7 +44,9 @@ import javax.management.openmbean.TabularData; import junit.textui.TestRunner; +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; import org.apache.activemq.EmbeddedBrokerTestSupport; @@ -1362,6 +1364,61 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { session.close(); } + public void testBrowseOrder() throws Exception { + connection = connectionFactory.createConnection(); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(20); + ((ActiveMQConnection) connection).setPrefetchPolicy(prefetchPolicy); + useConnection(connection); + + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + + CompositeData[] compdatalist = queue.browse(); + int initialQueueSize = compdatalist.length; + assertEquals("expected", MESSAGE_COUNT, initialQueueSize); + + int messageCount = initialQueueSize; + for (int i = 0; i < messageCount; i++) { + CompositeData cdata = compdatalist[i]; + String messageID = (String) cdata.get("JMSMessageID"); + assertNotNull("Should have a message ID for message " + i, messageID); + + Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); + assertTrue("not empty", intProperties.size() > 0); + assertEquals("counter in order", i, intProperties.get("counter")); + } + + echo("Attempting to consume 5 bytes messages from: " + destination); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + for (int i=0; i<5; i++) { + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals("ordered", i, message.getIntProperty("counter")); + echo("Consumed: " + message.getIntProperty("counter")); + } + consumer.close(); + session.close(); + connection.close(); + + // browse again and verify order + compdatalist = queue.browse(); + initialQueueSize = compdatalist.length; + assertEquals("5 gone", MESSAGE_COUNT - 5, initialQueueSize); + + messageCount = initialQueueSize; + for (int i = 0; i < messageCount - 4; i++) { + CompositeData cdata = compdatalist[i]; + + Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES); + assertTrue("not empty", intProperties.size() > 0); + assertEquals("counter in order", i + 5, intProperties.get("counter")); + echo("Got: " + intProperties.get("counter")); + } + } + public void testAddRemoveConnectorBrokerView() throws Exception { ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");