This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 28d1a53 ARTEMIS-2508 Crititical analyser trigger shutdown if removeAllMessages new 68f419d This closes #2855 28d1a53 is described below commit 28d1a53630bea2ce9ef137cda4724fbf106dd96e Author: brusdev <bruscin...@gmail.com> AuthorDate: Tue Oct 8 06:26:16 2019 +0200 ARTEMIS-2508 Crititical analyser trigger shutdown if removeAllMessages The crititical analyser trigger the broker shutdown if try to removeAllMessages with a huge queue. The iterQueue is split so as not to keep the lock too time. --- .../artemis/core/server/impl/QueueImpl.java | 70 +++++++++--------- .../tests/integration/paging/PagingTest.java | 86 ++++++++++++++++++++++ 2 files changed, 122 insertions(+), 34 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index f48e430..881d398 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1915,7 +1915,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public synchronized int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception { + public int deleteMatchingReferences(final int flushLimit, final Filter filter1, AckReason ackReason) throws Exception { return iterQueue(flushLimit, filter1, createDeleteMatchingAction(ackReason)); } @@ -1947,7 +1947,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { * @return * @throws Exception */ - private synchronized int iterQueue(final int flushLimit, + private int iterQueue(final int flushLimit, final Filter filter1, QueueIterateAction messageAction) throws Exception { int count = 0; @@ -1955,45 +1955,47 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { Transaction tx = new TransactionImpl(storageManager); - try (LinkedListIterator<MessageReference> iter = iterator()) { + synchronized (this) { + try (LinkedListIterator<MessageReference> iter = iterator()) { - while (iter.hasNext()) { - MessageReference ref = iter.next(); + while (iter.hasNext()) { + MessageReference ref = iter.next(); - if (ref.isPaged() && queueDestroyed) { - // this means the queue is being removed - // hence paged references are just going away through - // page cleanup - continue; - } + if (ref.isPaged() && queueDestroyed) { + // this means the queue is being removed + // hence paged references are just going away through + // page cleanup + continue; + } - if (filter1 == null || filter1.match(ref.getMessage())) { - messageAction.actMessage(tx, ref); - iter.remove(); - txCount++; - count++; + if (filter1 == null || filter1.match(ref.getMessage())) { + messageAction.actMessage(tx, ref); + iter.remove(); + txCount++; + count++; + } } - } - if (txCount > 0) { - tx.commit(); + if (txCount > 0) { + tx.commit(); - tx = new TransactionImpl(storageManager); + tx = new TransactionImpl(storageManager); - txCount = 0; - } + txCount = 0; + } - List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1); - for (MessageReference messageReference : cancelled) { - messageAction.actMessage(tx, messageReference, false); - count++; - txCount++; - } + List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1); + for (MessageReference messageReference : cancelled) { + messageAction.actMessage(tx, messageReference, false); + count++; + txCount++; + } - if (txCount > 0) { - tx.commit(); - tx = new TransactionImpl(storageManager); - txCount = 0; + if (txCount > 0) { + tx.commit(); + tx = new TransactionImpl(storageManager); + txCount = 0; + } } if (pageIterator != null && !queueDestroyed) { @@ -2350,7 +2352,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } @Override - public synchronized int moveReferences(final int flushLimit, + public int moveReferences(final int flushLimit, final Filter filter, final SimpleString toAddress, final boolean rejectDuplicates, @@ -2384,7 +2386,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { }); } - public synchronized int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception { + public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception { return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() { @Override public void actMessage(Transaction tx, MessageReference ref) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java index 19eee08..8e816d2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PagingTest.java @@ -57,6 +57,8 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.MessageHandler; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.client.impl.ClientConsumerInternal; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.DivertConfiguration; @@ -493,6 +495,90 @@ public class PagingTest extends ActiveMQTestBase { } @Test + public void testQueueRemoveAll() throws Exception { + clearDataRecreateServerDirs(); + + Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false); + + server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX); + + server.start(); + + final int numberOfMessages = 5000; + + locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true); + + sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, false, false); + + session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true); + + ClientProducer producer = session.createProducer(PagingTest.ADDRESS); + + ClientMessage message = null; + + byte[] body = new byte[MESSAGE_SIZE]; + + ByteBuffer bb = ByteBuffer.wrap(body); + + for (int j = 1; j <= MESSAGE_SIZE; j++) { + bb.put(getSamplebyte(j)); + } + + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + if (i % 1000 == 0) { + session.commit(); + } + } + session.commit(); + producer.close(); + session.close(); + + session = sf.createSession(false, false, false); + producer = session.createProducer(PagingTest.ADDRESS); + producer.send(session.createMessage(true)); + session.rollback(); + producer.close(); + session.close(); + + session = sf.createSession(false, false, false); + producer = session.createProducer(PagingTest.ADDRESS); + + for (int i = 0; i < numberOfMessages; i++) { + message = session.createMessage(true); + + ActiveMQBuffer bodyLocal = message.getBodyBuffer(); + + bodyLocal.writeBytes(body); + + producer.send(message); + if (i % 1000 == 0) { + session.commit(); + } + } + session.commit(); + producer.close(); + session.close(); + + Queue queue = server.locateQueue(PagingTest.ADDRESS); + + Wait.assertEquals(numberOfMessages * 2, queue::getMessageCount); + + QueueControl queueControl = (QueueControl) this.server.getManagementService().getResource(ResourceNames.QUEUE + PagingSendTest.ADDRESS); + int removedMessages = queueControl.removeAllMessages(); + + Assert.assertEquals(numberOfMessages * 2, removedMessages); + } + + @Test public void testEmptyAddress() throws Exception { if (storeType == StoreConfiguration.StoreType.FILE) { clearDataRecreateServerDirs();