This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e45a4a  ARTEMIS-2299 Support for redelivery-delay and LVQ
     new 1917463  This closes #2614
9e45a4a is described below

commit 9e45a4ac3a67c9aa56e6c07e845d4019f6783a8d
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Wed Apr 10 13:59:52 2019 -0400

    ARTEMIS-2299 Support for redelivery-delay and LVQ
---
 .../artemis/core/server/impl/LastValueQueue.java   |  4 +++
 .../artemis/core/server/impl/QueueImpl.java        |  2 +-
 .../artemis/tests/integration/server/LVQTest.java  | 32 ++++++++++++++++++++++
 3 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index bd87b3e..e418ee3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -119,6 +119,10 @@ public class LastValueQueue extends QueueImpl {
 
    @Override
    public synchronized void addHead(final MessageReference ref, boolean 
scheduling) {
+      // we first need to check redelivery-delay, as we can't put anything on 
headers if redelivery-delay
+      if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, 
false)) {
+         return;
+      }
 
       SimpleString lastValueProp = ref.getLastValueProperty();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index cc6dfc5..cf74815 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -186,7 +186,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private final QueuePendingMessageMetrics deliveringMetrics = new 
QueuePendingMessageMetrics(this);
 
-   private final ScheduledDeliveryHandler scheduledDeliveryHandler;
+   protected final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
    private AtomicLong messagesAdded = new AtomicLong(0);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
index f262aed..432072c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java
@@ -96,6 +96,38 @@ public class LVQTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testMultipleRollback() throws Exception {
+      AddressSettings qs = new AddressSettings();
+      qs.setDefaultLastValueQueue(true);
+      qs.setRedeliveryDelay(1);
+      server.getAddressSettingsRepository().addMatch(address.toString(), qs);
+
+      ClientProducer producer = 
clientSessionTxReceives.createProducer(address);
+      ClientConsumer consumer = clientSessionTxReceives.createConsumer(qName1);
+      SimpleString messageId1 = new SimpleString("SMID1");
+      ClientMessage m1 = createTextMessage(clientSession, "m1");
+      m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, messageId1);
+      producer.send(m1);
+      clientSessionTxReceives.start();
+      for (int i = 0; i < 10; i++) {
+         System.out.println("#Deliver " + i);
+         ClientMessage m = consumer.receive(5000);
+         Assert.assertNotNull(m);
+         m.acknowledge();
+         clientSessionTxReceives.rollback();
+      }
+      m1 = createTextMessage(clientSession, "m1");
+      m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, messageId1);
+      producer.send(m1);
+      ClientMessage m = consumer.receive(1000);
+      Assert.assertNotNull(m);
+      m.acknowledge();
+      Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
+      Assert.assertNull(consumer.receiveImmediate());
+      clientSessionTxReceives.commit();
+   }
+
+   @Test
    public void testFirstMessageReceivedButAckedAfter() throws Exception {
       ClientProducer producer = clientSession.createProducer(address);
       ClientConsumer consumer = clientSession.createConsumer(qName1);

Reply via email to