This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 9e45a4a ARTEMIS-2299 Support for redelivery-delay and LVQ new 1917463 This closes #2614 9e45a4a is described below commit 9e45a4ac3a67c9aa56e6c07e845d4019f6783a8d Author: Clebert Suconic <clebertsuco...@apache.org> AuthorDate: Wed Apr 10 13:59:52 2019 -0400 ARTEMIS-2299 Support for redelivery-delay and LVQ --- .../artemis/core/server/impl/LastValueQueue.java | 4 +++ .../artemis/core/server/impl/QueueImpl.java | 2 +- .../artemis/tests/integration/server/LVQTest.java | 32 ++++++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index bd87b3e..e418ee3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -119,6 +119,10 @@ public class LastValueQueue extends QueueImpl { @Override public synchronized void addHead(final MessageReference ref, boolean scheduling) { + // we first need to check redelivery-delay, as we can't put anything on headers if redelivery-delay + if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) { + return; + } SimpleString lastValueProp = ref.getLastValueProperty(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index cc6dfc5..cf74815 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -186,7 +186,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this); - private final ScheduledDeliveryHandler scheduledDeliveryHandler; + protected final ScheduledDeliveryHandler scheduledDeliveryHandler; private AtomicLong messagesAdded = new AtomicLong(0); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index f262aed..432072c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -96,6 +96,38 @@ public class LVQTest extends ActiveMQTestBase { } @Test + public void testMultipleRollback() throws Exception { + AddressSettings qs = new AddressSettings(); + qs.setDefaultLastValueQueue(true); + qs.setRedeliveryDelay(1); + server.getAddressSettingsRepository().addMatch(address.toString(), qs); + + ClientProducer producer = clientSessionTxReceives.createProducer(address); + ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1); + SimpleString messageId1 = new SimpleString("SMID1"); + ClientMessage m1 = createTextMessage(clientSession, "m1"); + m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, messageId1); + producer.send(m1); + clientSessionTxReceives.start(); + for (int i = 0; i < 10; i++) { + System.out.println("#Deliver " + i); + ClientMessage m = consumer.receive(5000); + Assert.assertNotNull(m); + m.acknowledge(); + clientSessionTxReceives.rollback(); + } + m1 = createTextMessage(clientSession, "m1"); + m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, messageId1); + producer.send(m1); + ClientMessage m = consumer.receive(1000); + Assert.assertNotNull(m); + m.acknowledge(); + Assert.assertEquals(m.getBodyBuffer().readString(), "m1"); + Assert.assertNull(consumer.receiveImmediate()); + clientSessionTxReceives.commit(); + } + + @Test public void testFirstMessageReceivedButAckedAfter() throws Exception { ClientProducer producer = clientSession.createProducer(address); ClientConsumer consumer = clientSession.createConsumer(qName1);