This is an automated email from the ASF dual-hosted git repository. orudyy pushed a commit to branch 8.0.x in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/8.0.x by this push: new 0a1c200 QPID-8514: [Broker-J] Delete next available entry if the least significant entry is acquired by the consumer 0a1c200 is described below commit 0a1c200a763ea190f1471b69b6cd5abf42100249 Author: Alex Rudyy <oru...@apache.org> AuthorDate: Sun Apr 25 15:20:14 2021 +0100 QPID-8514: [Broker-J] Delete next available entry if the least significant entry is acquired by the consumer --- .../qpid/server/queue/AbstractQueueEntryList.java | 1 + .../server/queue/RingOverflowPolicyHandler.java | 65 +++++-- .../queue/RingOverflowPolicyHandlerTest.java | 25 ++- .../qpid/server/queue/RingOverflowPolicyTest.java | 213 +++++++++++++++++++++ 4 files changed, 282 insertions(+), 22 deletions(-) diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java index 7c110f8..bbae28a 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java @@ -81,6 +81,7 @@ abstract class AbstractQueueEntryList implements QueueEntryList { case AVAILABLE: queueStatistics.addToAvailable(sizeWithHeader); + _queue.checkCapacity(); break; case ACQUIRED: if(isConsumerAcquired && !wasConsumerAcquired) diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java index 7a43dab..c64f875 100644 --- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java +++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java @@ -19,13 +19,20 @@ package org.apache.qpid.server.queue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.QueueMessages; import org.apache.qpid.server.model.OverflowPolicy; import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; +import org.apache.qpid.server.txn.ServerTransaction; public class RingOverflowPolicyHandler implements OverflowPolicyHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(RingOverflowPolicyHandler.class); private final Handler _handler; RingOverflowPolicyHandler(final Queue<?> queue, @@ -38,7 +45,7 @@ public class RingOverflowPolicyHandler implements OverflowPolicyHandler @Override public void checkOverflow(final QueueEntry newlyEnqueued) { - _handler.checkOverflow(); + _handler.checkOverflow(newlyEnqueued); } private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener @@ -57,10 +64,10 @@ public class RingOverflowPolicyHandler implements OverflowPolicyHandler @Override void onMaximumQueueDepthChange(final Queue<?> queue) { - checkOverflow(); + checkOverflow(null); } - private void checkOverflow() + private void checkOverflow(final QueueEntry newlyEnqueued) { // When this method causes an entry to be deleted, the size of the queue is changed, leading to // checkOverflow being called again (because for other policies this may trigger relaxation of flow control, @@ -78,6 +85,7 @@ public class RingOverflowPolicyHandler implements OverflowPolicyHandler int counter = 0; int queueDepthMessages; long queueDepthBytes; + QueueEntry lastSeenEntry = null; do { queueDepthMessages = _queue.getQueueDepthMessages(); @@ -94,22 +102,26 @@ public class RingOverflowPolicyHandler implements OverflowPolicyHandler overflow = true; } - QueueEntry entry = _queue.getLeastSignificantOldestEntry(); - - if (entry != null) - { - counter++; - _queue.deleteEntry(entry); - } - else + lastSeenEntry = lastSeenEntry == null + ? _queue.getLeastSignificantOldestEntry() + : lastSeenEntry.getNextValidEntry(); + if (lastSeenEntry != null) { - queueDepthMessages = _queue.getQueueDepthMessages(); - queueDepthBytes = _queue.getQueueDepthBytes(); - break; + // ensure that we are deleting only entries before the newly enqueued one + if (newlyEnqueued != null && lastSeenEntry.compareTo(newlyEnqueued) >= 0) + { + // stop at new entry + lastSeenEntry = null; + } + else if (lastSeenEntry.acquireOrSteal(null)) + { + counter++; + deleteAcquiredEntry(lastSeenEntry); + } } } } - while (bytesOverflow || messagesOverflow); + while ((bytesOverflow || messagesOverflow) && lastSeenEntry != null); if (overflow) { @@ -126,6 +138,27 @@ public class RingOverflowPolicyHandler implements OverflowPolicyHandler } } } - } + private void deleteAcquiredEntry(final QueueEntry entry) + { + final MessageStore messageStore = _queue.getVirtualHost().getMessageStore(); + final ServerTransaction txn = + new AsyncAutoCommitTransaction(messageStore, (future, action) -> action.postCommit()); + txn.dequeue(entry.getEnqueueRecord(), + new ServerTransaction.Action() + { + @Override + public void postCommit() + { + entry.delete(); + } + + @Override + public void onRollback() + { + + } + }); + } + } } diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java index a69af14..7143fcd 100644 --- a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java +++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java @@ -19,7 +19,6 @@ package org.apache.qpid.server.queue; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; @@ -40,6 +39,9 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.OverflowPolicy; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.queue.ProducerFlowControlOverflowPolicyHandlerTest.LogMessageMatcher; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; import org.apache.qpid.test.utils.UnitTestBase; public class RingOverflowPolicyHandlerTest extends UnitTestBase @@ -52,16 +54,24 @@ public class RingOverflowPolicyHandlerTest extends UnitTestBase @Before public void setUp() throws Exception { - _eventLogger = mock(EventLogger.class); _subject = mock(LogSubject.class); + final Transaction tx = mock(Transaction.class); + + final MessageStore messageStore = mock(MessageStore.class); + when(messageStore.newTransaction()).thenReturn(tx); + + final QueueManagingVirtualHost virtualHost = mock(QueueManagingVirtualHost.class); + when(virtualHost.getMessageStore()).thenReturn(messageStore); + _queue = mock(AbstractQueue.class); when(_queue.getMaximumQueueDepthBytes()).thenReturn(-1L); when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L); when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.RING); when(_queue.getQueueDepthMessages()).thenReturn(0); when(_queue.getLogSubject()).thenReturn(_subject); + when(_queue.getVirtualHost()).thenReturn(virtualHost); _ringOverflowPolicyHandler = new RingOverflowPolicyHandler(_queue, _eventLogger); } @@ -77,7 +87,7 @@ public class RingOverflowPolicyHandlerTest extends UnitTestBase _ringOverflowPolicyHandler.checkOverflow(null); - verify(_queue).deleteEntry(lastEntry); + verify(lastEntry).delete(); LogMessage dropped = QueueMessages.DROPPED(1L, 4, 1, 5,-1); verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped))); verifyNoMoreInteractions(_eventLogger); @@ -94,7 +104,7 @@ public class RingOverflowPolicyHandlerTest extends UnitTestBase _ringOverflowPolicyHandler.checkOverflow(null); - verify((AbstractQueue<?>) _queue).deleteEntry(lastEntry); + verify(lastEntry).delete(); LogMessage dropped = QueueMessages.DROPPED(1, 4, 5, -1,5); verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped))); verifyNoMoreInteractions(_eventLogger); @@ -103,26 +113,28 @@ public class RingOverflowPolicyHandlerTest extends UnitTestBase @Test public void testCheckOverflowWhenUnderfullBytes() throws Exception { + QueueEntry lastEntry = createLastEntry(); when(_queue.getQueueDepthBytes()).thenReturn(5L); when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L); when(_queue.getQueueDepthMessages()).thenReturn(3); _ringOverflowPolicyHandler.checkOverflow(null); - verify(_queue, never()).deleteEntry(any(QueueEntry.class)); + verify(lastEntry, never()).delete(); verifyNoMoreInteractions(_eventLogger); } @Test public void testCheckOverflowWhenUnderfullMessages() throws Exception { + QueueEntry lastEntry = createLastEntry(); when(_queue.getQueueDepthMessages()).thenReturn(5); when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L); when(_queue.getQueueDepthBytes()).thenReturn(10L); _ringOverflowPolicyHandler.checkOverflow(null); - verify(_queue, never()).deleteEntry(any(QueueEntry.class)); + verify(lastEntry, never()).delete(); verifyNoMoreInteractions(_eventLogger); } @@ -133,6 +145,7 @@ public class RingOverflowPolicyHandlerTest extends UnitTestBase when(oldestMessage.getMessageHeader()).thenReturn(oldestMessageHeader); QueueEntry oldestEntry = mock(QueueEntry.class); when(oldestEntry.getMessage()).thenReturn(oldestMessage); + when(oldestEntry.acquireOrSteal(null)).thenReturn(true); return oldestEntry; } } diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java new file mode 100644 index 0000000..584485f --- /dev/null +++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java @@ -0,0 +1,213 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.Serializable; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.apache.qpid.server.configuration.updater.TaskExecutor; +import org.apache.qpid.server.configuration.updater.TaskExecutorImpl; +import org.apache.qpid.server.consumer.ConsumerOption; +import org.apache.qpid.server.consumer.TestConsumerTarget; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.message.internal.InternalMessage; +import org.apache.qpid.server.model.BrokerTestHelper; +import org.apache.qpid.server.model.ConfiguredObjectFactory; +import org.apache.qpid.server.model.OverflowPolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.MessageEnqueueRecord; +import org.apache.qpid.server.virtualhost.AbstractVirtualHost; +import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost; +import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost; +import org.apache.qpid.test.utils.UnitTestBase; + +public class RingOverflowPolicyTest extends UnitTestBase +{ + private TaskExecutor _taskExecutor; + private QueueManagingVirtualHost<?> _virtualHost; + private AtomicLong _messageId; + + @Before + public void setUp() throws Exception + { + _taskExecutor = new TaskExecutorImpl(); + _taskExecutor.start(); + String name = getClass().getName(); + final VirtualHostNode<?> virtualHostNode = BrokerTestHelper.createVirtualHostNodeMock( + name, true, BrokerTestHelper.createAccessControlMock(), BrokerTestHelper.createBrokerMock()); + when(virtualHostNode.getChildExecutor()).thenReturn(_taskExecutor); + when(virtualHostNode.getTaskExecutor()).thenReturn(_taskExecutor); + + final Map<String, Object> virtualHostAttributes = new HashMap<>(); + virtualHostAttributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE); + virtualHostAttributes.put(VirtualHost.NAME, name); + virtualHostAttributes.put(QueueManagingVirtualHost.CONNECTION_THREAD_POOL_SIZE, 2); + virtualHostAttributes.put(QueueManagingVirtualHost.NUMBER_OF_SELECTORS, 1); + + final ConfiguredObjectFactory objectFactory = virtualHostNode.getObjectFactory(); + final QueueManagingVirtualHost<?> host = (QueueManagingVirtualHost<?>)objectFactory.create(VirtualHost.class, virtualHostAttributes, virtualHostNode); + final AbstractVirtualHost abstractVirtualHost = (AbstractVirtualHost) host; + abstractVirtualHost.start(); + when(virtualHostNode.getVirtualHost()).thenReturn(abstractVirtualHost); + _virtualHost = host; + _messageId = new AtomicLong(); + } + + @After + public void tearDown() throws Exception + { + _virtualHost.close(); + _taskExecutor.stop(); + } + + @Test + public void testEnqueueWithOverflowWhenLeastSignificantEntryIsAcquiredByConsumer() throws Exception + { + final Queue<?> queue = createTestRingQueue(2); + + final ServerMessage<?> message1 = enqueueTestMessage(queue); + + final TestConsumerTarget consumerTarget = createTestConsumerTargetAndConsumer(queue); + final boolean received = consumerTarget.processPending(); + assertThat(received, is(true)); + + final MessageInstance receivedMessage = consumerTarget.getMessages().remove(0); + assertThat(receivedMessage, is(notNullValue())); + assertThat(receivedMessage.isAcquired(), is(true)); + assertThat(receivedMessage.getMessage(), is(message1)); + + final ServerMessage<?> message2 = enqueueTestMessage(queue); + assertThat(queue.getQueueDepthMessages(), is(equalTo(2))); + + final ServerMessage<?> message3 = enqueueTestMessage(queue); + assertThat(queue.getQueueDepthMessages(), is(equalTo(2))); + + assertThat(message2.isReferenced(queue), is(equalTo(false))); + assertThat(message3.isReferenced(queue), is(equalTo(true))); + } + + @Test + public void testLeastSignificantEntryAcquiredByConsumerIsDeletedAfterRelease() throws Exception + { + final Queue<?> queue = createTestRingQueue(1); + + final ServerMessage<?> message1 = enqueueTestMessage(queue); + + final TestConsumerTarget consumerTarget = createTestConsumerTargetAndConsumer(queue); + final boolean received = consumerTarget.processPending(); + assertThat(received, is(true)); + + final MessageInstance receivedMessage = consumerTarget.getMessages().remove(0); + assertThat(receivedMessage, is(notNullValue())); + assertThat(receivedMessage.isAcquired(), is(true)); + assertThat(receivedMessage.getMessage(), is(message1)); + + final ServerMessage<?> message2 = enqueueTestMessage(queue); + assertThat(queue.getQueueDepthMessages(), is(equalTo(2))); + + assertThat(message1.isReferenced(queue), is(equalTo(true))); + assertThat(message2.isReferenced(queue), is(equalTo(true))); + + receivedMessage.release(); + assertThat(queue.getQueueDepthMessages(), is(equalTo(1))); + assertThat(message1.isReferenced(queue), is(equalTo(false))); + assertThat(message2.isReferenced(queue), is(equalTo(true))); + } + + private Queue<?> createTestRingQueue(final int messageLimit) + { + final Map<String, Object> attributes = new HashMap<>(); + attributes.put(Queue.NAME, getTestName()); + attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING.name()); + attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, messageLimit); + return _virtualHost.createChild(Queue.class, attributes); + } + + private TestConsumerTarget createTestConsumerTargetAndConsumer(final Queue<?> queue) throws Exception + { + final TestConsumerTarget consumerTarget = new TestConsumerTarget(); + queue.addConsumer(consumerTarget, + null, + InternalMessage.class, + getTestName(), + EnumSet.of(ConsumerOption.ACQUIRES, ConsumerOption.SEES_REQUEUES), + 0); + return consumerTarget; + } + + private ServerMessage<?> enqueueTestMessage(final Queue<?> queue) + { + final ServerMessage<?> message = createMessage(_messageId.incrementAndGet(), queue.getName()); + final MessageEnqueueRecord record = createMessageEnqueueRecord(queue.getId(), message.getMessageNumber()); + queue.enqueue(message, null, record); + return message; + } + + private MessageEnqueueRecord createMessageEnqueueRecord(final UUID queueId, final long messageNumber) + { + return new MessageEnqueueRecord() + { + @Override + public UUID getQueueId() + { + return queueId; + } + + @Override + public long getMessageNumber() + { + return messageNumber; + } + }; + } + + private ServerMessage<?> createMessage(final long messageNumber, final String queueName) + { + final AMQMessageHeader amqpHeader = mock(AMQMessageHeader.class); + when(amqpHeader.getMessageId()).thenReturn(String.valueOf(messageNumber)); + when(amqpHeader.getExpiration()).thenReturn(0L); + final Serializable messageContent = String.format("test message %d", messageNumber); + return InternalMessage.createMessage(_virtualHost.getMessageStore(), + amqpHeader, + messageContent, + false, + queueName); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org