ARTEMIS-1657 - Properly decrement memory usage when moving messages from
queue

When messages are retried and moved froma DLQ to the original queue the
memory usage tracker needs to be decremented


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/58648715
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/58648715
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/58648715

Branch: refs/heads/master
Commit: 586487155ca90aca1f42464eaedff7b0ae9a3a5b
Parents: 26a28d0
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Fri Feb 2 07:39:50 2018 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Fri Feb 2 11:13:01 2018 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     |  4 +-
 .../management/QueueControlTest.java            | 46 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58648715/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a454be0..c527c46 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1813,6 +1813,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
             if (!ignored) {
                move(null, toAddress, binding, ref, rejectDuplicates, 
AckReason.NORMAL);
+               refRemoved(ref);
                //move(toAddress, tx, ref, false, rejectDuplicates);
             }
          }
@@ -1861,9 +1862,8 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                   move(SimpleString.toSimpleString(originalMessageAddress), 
tx, ref, false, false, targetQueue.longValue());
                } else {
                   move(SimpleString.toSimpleString(originalMessageAddress), 
tx, ref, false, false);
-
                }
-
+               refRemoved(ref);
             }
          }
       });

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58648715/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index cbfba49..9c9e6f5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -18,6 +18,7 @@ package 
org.apache.activemq.artemis.tests.integration.management;
 
 import static 
org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
 
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -25,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.json.JsonArray;
 import javax.json.JsonObject;
@@ -52,9 +54,11 @@ import 
org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import 
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServers;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import 
org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
 import org.apache.activemq.artemis.utils.Base64;
@@ -977,6 +981,22 @@ public class QueueControlTest extends ManagementTestBase {
 
       session.start();
 
+      final LocalQueueBinding binding = (LocalQueueBinding) 
server.getPostOffice().getBinding(qName);
+      Queue q = binding.getQueue();
+      final LocalQueueBinding binding2 = (LocalQueueBinding) 
server.getPostOffice().getBinding(dlq);
+      Queue q2 = binding2.getQueue();
+
+      Field queueMemorySizeField = 
QueueImpl.class.getDeclaredField("queueMemorySize");
+      queueMemorySizeField.setAccessible(true);
+
+      //Get memory size counters to verify
+      AtomicInteger queueMemorySize1 = (AtomicInteger) 
queueMemorySizeField.get(q);
+      AtomicInteger queueMemorySize2 = (AtomicInteger) 
queueMemorySizeField.get(q2);
+
+      //Verify that original queue has a memory size greater than 0 and DLQ is 0
+      assertTrue(queueMemorySize1.get() > 0);
+      assertTrue(queueMemorySize2.get() == 0);
+
       // Read and rollback all messages to DLQ
       ClientConsumer clientConsumer = session.createConsumer(qName);
       for (int i = 0; i < numMessagesToTest; i++) {
@@ -989,6 +1009,10 @@ public class QueueControlTest extends ManagementTestBase {
 
       Assert.assertNull(clientConsumer.receiveImmediate());
 
+      //Verify that original queue has a memory size of 0 and DLQ is greater 
than 0 after rollback
+      assertTrue(queueMemorySize1.get() == 0);
+      assertTrue(queueMemorySize2.get() > 0);
+
       QueueControl dlqQueueControl = createManagementControl(dla, dlq);
       Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
 
@@ -998,6 +1022,10 @@ public class QueueControlTest extends ManagementTestBase {
       // Assert DLQ is empty...
       Assert.assertEquals(0, getMessageCount(dlqQueueControl));
 
+      //Verify that original queue has a memory size of greater than 0 and DLQ 
is 0 after move
+      assertTrue(queueMemorySize1.get() > 0);
+      assertTrue(queueMemorySize2.get() == 0);
+
       // .. and that the messages is now on the original queue once more.
       for (int i = 0; i < numMessagesToTest; i++) {
          ClientMessage clientMessage = clientConsumer.receive(500);
@@ -1007,6 +1035,10 @@ public class QueueControlTest extends ManagementTestBase 
{
       }
 
       clientConsumer.close();
+
+      //Verify that original queue and DLQ have a memory size of 0
+      assertTrue(queueMemorySize1.get() == 0);
+      assertTrue(queueMemorySize2.get() == 0);
    }
 
    /**
@@ -1035,14 +1067,28 @@ public class QueueControlTest extends 
ManagementTestBase {
       message.putLongProperty(key, value);
       producer.send(message);
 
+      final LocalQueueBinding binding = (LocalQueueBinding) 
server.getPostOffice().getBinding(queue);
+      Queue q = binding.getQueue();
+      Field queueMemorySizeField = 
QueueImpl.class.getDeclaredField("queueMemorySize");
+      queueMemorySizeField.setAccessible(true);
+
+      //Get memory size counters to verify
+      AtomicInteger queueMemorySize = (AtomicInteger) 
queueMemorySizeField.get(q);
+
       QueueControl queueControl = createManagementControl(address, queue);
       Assert.assertEquals(1, getMessageCount(queueControl));
 
+      //verify memory usage is greater than 0
+      Assert.assertTrue(queueMemorySize.get() > 0);
+
       // moved all messages to otherQueue
       int movedMessagesCount = queueControl.moveMessages(null, 
otherQueue.toString());
       Assert.assertEquals(1, movedMessagesCount);
       Assert.assertEquals(0, getMessageCount(queueControl));
 
+      //verify memory usage is 0 after move
+      Assert.assertEquals(0, queueMemorySize.get());
+
       // check there is no message to consume from queue
       consumeMessages(0, session, queue);
 

Reply via email to