This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 28d1a53  ARTEMIS-2508 Crititical analyser trigger shutdown if 
removeAllMessages
     new 68f419d  This closes #2855
28d1a53 is described below

commit 28d1a53630bea2ce9ef137cda4724fbf106dd96e
Author: brusdev <bruscin...@gmail.com>
AuthorDate: Tue Oct 8 06:26:16 2019 +0200

    ARTEMIS-2508 Crititical analyser trigger shutdown if removeAllMessages
    
    The crititical analyser trigger the broker shutdown if try to
    removeAllMessages with a huge queue. The iterQueue is split so as
    not to keep the lock too time.
---
 .../artemis/core/server/impl/QueueImpl.java        | 70 +++++++++---------
 .../tests/integration/paging/PagingTest.java       | 86 ++++++++++++++++++++++
 2 files changed, 122 insertions(+), 34 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index f48e430..881d398 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1915,7 +1915,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public synchronized int deleteMatchingReferences(final int flushLimit, 
final Filter filter1, AckReason ackReason) throws Exception {
+   public int deleteMatchingReferences(final int flushLimit, final Filter 
filter1, AckReason ackReason) throws Exception {
       return iterQueue(flushLimit, filter1, 
createDeleteMatchingAction(ackReason));
    }
 
@@ -1947,7 +1947,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
     * @return
     * @throws Exception
     */
-   private synchronized int iterQueue(final int flushLimit,
+   private int iterQueue(final int flushLimit,
                                       final Filter filter1,
                                       QueueIterateAction messageAction) throws 
Exception {
       int count = 0;
@@ -1955,45 +1955,47 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
       Transaction tx = new TransactionImpl(storageManager);
 
-      try (LinkedListIterator<MessageReference> iter = iterator()) {
+      synchronized (this) {
+         try (LinkedListIterator<MessageReference> iter = iterator()) {
 
-         while (iter.hasNext()) {
-            MessageReference ref = iter.next();
+            while (iter.hasNext()) {
+               MessageReference ref = iter.next();
 
-            if (ref.isPaged() && queueDestroyed) {
-               // this means the queue is being removed
-               // hence paged references are just going away through
-               // page cleanup
-               continue;
-            }
+               if (ref.isPaged() && queueDestroyed) {
+                  // this means the queue is being removed
+                  // hence paged references are just going away through
+                  // page cleanup
+                  continue;
+               }
 
-            if (filter1 == null || filter1.match(ref.getMessage())) {
-               messageAction.actMessage(tx, ref);
-               iter.remove();
-               txCount++;
-               count++;
+               if (filter1 == null || filter1.match(ref.getMessage())) {
+                  messageAction.actMessage(tx, ref);
+                  iter.remove();
+                  txCount++;
+                  count++;
+               }
             }
-         }
 
-         if (txCount > 0) {
-            tx.commit();
+            if (txCount > 0) {
+               tx.commit();
 
-            tx = new TransactionImpl(storageManager);
+               tx = new TransactionImpl(storageManager);
 
-            txCount = 0;
-         }
+               txCount = 0;
+            }
 
-         List<MessageReference> cancelled = 
scheduledDeliveryHandler.cancel(filter1);
-         for (MessageReference messageReference : cancelled) {
-            messageAction.actMessage(tx, messageReference, false);
-            count++;
-            txCount++;
-         }
+            List<MessageReference> cancelled = 
scheduledDeliveryHandler.cancel(filter1);
+            for (MessageReference messageReference : cancelled) {
+               messageAction.actMessage(tx, messageReference, false);
+               count++;
+               txCount++;
+            }
 
-         if (txCount > 0) {
-            tx.commit();
-            tx = new TransactionImpl(storageManager);
-            txCount = 0;
+            if (txCount > 0) {
+               tx.commit();
+               tx = new TransactionImpl(storageManager);
+               txCount = 0;
+            }
          }
 
          if (pageIterator != null && !queueDestroyed) {
@@ -2350,7 +2352,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public synchronized int moveReferences(final int flushLimit,
+   public int moveReferences(final int flushLimit,
                                           final Filter filter,
                                           final SimpleString toAddress,
                                           final boolean rejectDuplicates,
@@ -2384,7 +2386,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       });
    }
 
-   public synchronized int moveReferencesBetweenSnFQueues(final SimpleString 
queueSuffix) throws Exception {
+   public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) 
throws Exception {
       return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
          @Override
          public void actMessage(Transaction tx, MessageReference ref) throws 
Exception {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
index 19eee08..8e816d2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java
@@ -57,6 +57,8 @@ import 
org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.MessageHandler;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
@@ -493,6 +495,90 @@ public class PagingTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testQueueRemoveAll() throws Exception {
+      clearDataRecreateServerDirs();
+
+      Configuration config = 
createDefaultInVMConfig().setJournalSyncNonTransactional(false);
+
+      server = createServer(true, config, PagingTest.PAGE_SIZE, 
PagingTest.PAGE_MAX);
+
+      server.start();
+
+      final int numberOfMessages = 5000;
+
+      locator = 
createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+
+      ClientSession session = sf.createSession(false, false, false);
+
+      session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      ClientMessage message = null;
+
+      byte[] body = new byte[MESSAGE_SIZE];
+
+      ByteBuffer bb = ByteBuffer.wrap(body);
+
+      for (int j = 1; j <= MESSAGE_SIZE; j++) {
+         bb.put(getSamplebyte(j));
+      }
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         producer.send(message);
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+      session.commit();
+      producer.close();
+      session.close();
+
+      session = sf.createSession(false, false, false);
+      producer = session.createProducer(PagingTest.ADDRESS);
+      producer.send(session.createMessage(true));
+      session.rollback();
+      producer.close();
+      session.close();
+
+      session = sf.createSession(false, false, false);
+      producer = session.createProducer(PagingTest.ADDRESS);
+
+      for (int i = 0; i < numberOfMessages; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(body);
+
+         producer.send(message);
+         if (i % 1000 == 0) {
+            session.commit();
+         }
+      }
+      session.commit();
+      producer.close();
+      session.close();
+
+      Queue queue = server.locateQueue(PagingTest.ADDRESS);
+
+      Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount);
+
+      QueueControl queueControl = (QueueControl) 
this.server.getManagementService().getResource(ResourceNames.QUEUE + 
PagingSendTest.ADDRESS);
+      int removedMessages = queueControl.removeAllMessages();
+
+      Assert.assertEquals(numberOfMessages * 2, removedMessages);
+   }
+
+   @Test
    public void testEmptyAddress() throws Exception {
       if (storeType == StoreConfiguration.StoreType.FILE) {
          clearDataRecreateServerDirs();

Reply via email to