ARTEMIS-1657 - Properly decrement memory usage when moving messages from queue
When messages are retried and moved froma DLQ to the original queue the memory usage tracker needs to be decremented Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/58648715 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/58648715 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/58648715 Branch: refs/heads/master Commit: 586487155ca90aca1f42464eaedff7b0ae9a3a5b Parents: 26a28d0 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Fri Feb 2 07:39:50 2018 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Fri Feb 2 11:13:01 2018 -0500 ---------------------------------------------------------------------- .../artemis/core/server/impl/QueueImpl.java | 4 +- .../management/QueueControlTest.java | 46 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58648715/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a454be0..c527c46 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1813,6 +1813,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { if (!ignored) { move(null, toAddress, binding, ref, rejectDuplicates, AckReason.NORMAL); + refRemoved(ref); //move(toAddress, tx, ref, false, rejectDuplicates); } } @@ -1861,9 +1862,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false, targetQueue.longValue()); } else { move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false, false); - } - + refRemoved(ref); } } }); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58648715/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index cbfba49..9c9e6f5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.management; import static org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; @@ -25,6 +26,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.json.JsonArray; import javax.json.JsonObject; @@ -52,9 +54,11 @@ import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl; +import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil; import org.apache.activemq.artemis.utils.Base64; @@ -977,6 +981,22 @@ public class QueueControlTest extends ManagementTestBase { session.start(); + final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(qName); + Queue q = binding.getQueue(); + final LocalQueueBinding binding2 = (LocalQueueBinding) server.getPostOffice().getBinding(dlq); + Queue q2 = binding2.getQueue(); + + Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize"); + queueMemorySizeField.setAccessible(true); + + //Get memory size counters to verify + AtomicInteger queueMemorySize1 = (AtomicInteger) queueMemorySizeField.get(q); + AtomicInteger queueMemorySize2 = (AtomicInteger) queueMemorySizeField.get(q2); + + //Verify that original queue has a memory size greater than 0 and DLQ is 0 + assertTrue(queueMemorySize1.get() > 0); + assertTrue(queueMemorySize2.get() == 0); + // Read and rollback all messages to DLQ ClientConsumer clientConsumer = session.createConsumer(qName); for (int i = 0; i < numMessagesToTest; i++) { @@ -989,6 +1009,10 @@ public class QueueControlTest extends ManagementTestBase { Assert.assertNull(clientConsumer.receiveImmediate()); + //Verify that original queue has a memory size of 0 and DLQ is greater than 0 after rollback + assertTrue(queueMemorySize1.get() == 0); + assertTrue(queueMemorySize2.get() > 0); + QueueControl dlqQueueControl = createManagementControl(dla, dlq); Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl)); @@ -998,6 +1022,10 @@ public class QueueControlTest extends ManagementTestBase { // Assert DLQ is empty... Assert.assertEquals(0, getMessageCount(dlqQueueControl)); + //Verify that original queue has a memory size of greater than 0 and DLQ is 0 after move + assertTrue(queueMemorySize1.get() > 0); + assertTrue(queueMemorySize2.get() == 0); + // .. and that the messages is now on the original queue once more. for (int i = 0; i < numMessagesToTest; i++) { ClientMessage clientMessage = clientConsumer.receive(500); @@ -1007,6 +1035,10 @@ public class QueueControlTest extends ManagementTestBase { } clientConsumer.close(); + + //Verify that original queue and DLQ have a memory size of 0 + assertTrue(queueMemorySize1.get() == 0); + assertTrue(queueMemorySize2.get() == 0); } /** @@ -1035,14 +1067,28 @@ public class QueueControlTest extends ManagementTestBase { message.putLongProperty(key, value); producer.send(message); + final LocalQueueBinding binding = (LocalQueueBinding) server.getPostOffice().getBinding(queue); + Queue q = binding.getQueue(); + Field queueMemorySizeField = QueueImpl.class.getDeclaredField("queueMemorySize"); + queueMemorySizeField.setAccessible(true); + + //Get memory size counters to verify + AtomicInteger queueMemorySize = (AtomicInteger) queueMemorySizeField.get(q); + QueueControl queueControl = createManagementControl(address, queue); Assert.assertEquals(1, getMessageCount(queueControl)); + //verify memory usage is greater than 0 + Assert.assertTrue(queueMemorySize.get() > 0); + // moved all messages to otherQueue int movedMessagesCount = queueControl.moveMessages(null, otherQueue.toString()); Assert.assertEquals(1, movedMessagesCount); Assert.assertEquals(0, getMessageCount(queueControl)); + //verify memory usage is 0 after move + Assert.assertEquals(0, queueMemorySize.get()); + // check there is no message to consume from queue consumeMessages(0, session, queue);