https://issues.apache.org/jira/browse/AMQ-4930 - ensure we page in messages for browse/expire when destination stats are disabled via config
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/41659725 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/41659725 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/41659725 Branch: refs/heads/trunk Commit: 41659725f4c4fa027386148077aa76c31d8853af Parents: 6bdce73 Author: gtully <gary.tu...@gmail.com> Authored: Tue Aug 5 16:32:44 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Wed Aug 6 15:21:19 2014 +0100 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 12 ++++++++-- .../org/apache/activemq/bugs/AMQ4930Test.java | 24 +++++++++++++++----- 2 files changed, 28 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/41659725/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 647ba68..3cdd91c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1232,9 +1232,17 @@ public class Queue extends BaseDestination implements Task, UsageListener { } finally { pagedInMessagesLock.readLock().unlock(); } - LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, destinationStatistics.getMessages().getCount(), memoryUsage.getPercentUsage()}); + int messagesInQueue = 0; + messagesLock.readLock().lock(); + try { + messagesInQueue = messages.size(); + } finally { + messagesLock.readLock().unlock(); + } + + LOG.trace("max {}, alreadyPagedIn {}, messagesCount {}, memoryUsage {}%", new Object[]{max, alreadyPagedIn, messagesInQueue, memoryUsage.getPercentUsage()}); return (alreadyPagedIn < max) - && (alreadyPagedIn < destinationStatistics.getMessages().getCount()) + && (alreadyPagedIn < messagesInQueue) && messages.hasSpace(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/41659725/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java index f75eae3..e6bea2a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java @@ -30,6 +30,7 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +38,7 @@ public class AMQ4930Test extends TestCase { private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class); final int messageCount = 150; final int messageSize = 1024*1024; + final int maxBrowsePageSize = 50; final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG"); BrokerService broker; ActiveMQConnectionFactory factory; @@ -50,8 +52,8 @@ public class AMQ4930Test extends TestCase { PolicyEntry policy = new PolicyEntry(); // disable expriy processing as this will call browse in parallel policy.setExpireMessagesPeriod(0); - policy.setMaxPageSize(50); - policy.setMaxBrowsePageSize(50); + policy.setMaxPageSize(maxBrowsePageSize); + policy.setMaxBrowsePageSize(maxBrowsePageSize); pMap.setDefaultEntry(policy); broker.setDestinationPolicy(pMap); @@ -65,6 +67,11 @@ public class AMQ4930Test extends TestCase { doTestBrowsePending(DeliveryMode.PERSISTENT); } + public void testWithStatsDisabled() throws Exception { + ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().setEnabled(false); + doTestBrowsePending(DeliveryMode.PERSISTENT); + } + public void doTestBrowsePending(int deliveryMode) throws Exception { Connection connection = factory.createConnection(); @@ -77,7 +84,6 @@ public class AMQ4930Test extends TestCase { for (int i = 0; i < messageCount; i++) { producer.send(bigQueue, bytesMessage); - LOG.info("Sent: " + i); } final QueueViewMBean queueViewMBean = (QueueViewMBean) @@ -94,15 +100,21 @@ public class AMQ4930Test extends TestCase { final Queue underTest = (Queue) ((RegionBroker)broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue); // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit - underTest.browse(); - underTest.browse(); + Message[] browsed = underTest.browse(); + LOG.info("Browsed: " + browsed.length); + assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); + browsed = underTest.browse(); + LOG.info("Browsed: " + browsed.length); + assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); Runtime.getRuntime().gc(); long free = Runtime.getRuntime().freeMemory()/1024; LOG.info("free at start of check: " + free); // check for memory growth for (int i=0; i<10; i++) { LOG.info("free: " + Runtime.getRuntime().freeMemory()/1024); - underTest.browse(); + browsed = underTest.browse(); + LOG.info("Browsed: " + browsed.length); + assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); Runtime.getRuntime().gc(); Runtime.getRuntime().gc(); assertTrue("No growth: " + Runtime.getRuntime().freeMemory()/1024, Runtime.getRuntime().freeMemory()/1024 >= (free - (free * 0.1)));