Author: gtully
Date: Fri Jan 30 15:30:24 2009
New Revision: 739307

URL: http://svn.apache.org/viewvc?rev=739307&view=rev
Log:
fix for https://issues.apache.org/activemq/browse/AMQ-1593

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 Fri Jan 30 15:30:24 2009
@@ -316,7 +316,6 @@
                         inAckRange = true;
                     }
                     if (inAckRange) {
-                        node.incrementRedeliveryCounter();
                         if (ack.getLastMessageId().equals(messageId)) {
                             destination = node.getRegionDestination();
                             callDispatchMatched = true;

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java
 Fri Jan 30 15:30:24 2009
@@ -70,9 +70,6 @@
     
     public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) 
throws Exception {
 
-        final int nbMessages = 10;
-        final String destinationName = "Destination";
-
         ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(brokerUrl);
         
         Connection connection = connectionFactory.createConnection();
@@ -179,36 +176,66 @@
         }
     }
     
-    public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception {
+    // AMQ-1593
+    public void testValidateRedeliveryCountOnRollback() throws Exception {
 
-        ConnectionFactory connectionFactory = 
+        final int numMessages = 1;
+       ConnectionFactory connectionFactory = 
             new ActiveMQConnectionFactory(brokerUrl);
         Connection connection = connectionFactory.createConnection();
         connection.start();
 
-        populateDestination(nbMessages, destinationName, connection);
+        populateDestination(numMessages, destinationName, connection);
 
         {
             AtomicInteger received = new AtomicInteger();
-            Map<String, Boolean> rolledback = new ConcurrentHashMap<String, 
Boolean>();
-            while (received.get() < nbMessages) {
-                Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+            final int maxRetries = new 
RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
+                Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
                 Destination destination = session.createQueue(destinationName);
 
                 MessageConsumer consumer = 
session.createConsumer(destination);            
                 TextMessage msg = (TextMessage) consumer.receive(1000);
                 if (msg != null) {
-                    if (msg != null && rolledback.put(msg.getText(), 
Boolean.TRUE) != null) {
-                        LOG.info("Received message " + msg.getText() + " (" + 
received.getAndIncrement() + ")" + msg.getJMSMessageID());
-                        assertTrue(msg.getJMSRedelivered());
-                        session.commit();
-                    }
+                    LOG.info("Received message " + msg.getText() + " (" + 
received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                    assertEquals("redelivery property matches deliveries", 
received.get(), msg.getLongProperty("JMSXDeliveryCount"));
+                    session.rollback();
                 }
                 session.close();
             }
         }
     }
     
+    // AMQ-1593
+    public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws 
Exception {
+
+        final int numMessages = 1;
+       ConnectionFactory connectionFactory = 
+            new ActiveMQConnectionFactory(brokerUrl + 
"?jms.prefetchPolicy.queuePrefetch=0");
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+
+        populateDestination(numMessages, destinationName, connection);
+
+        {
+            AtomicInteger received = new AtomicInteger();
+            final int maxRetries = new 
RedeliveryPolicy().getMaximumRedeliveries();
+            while (received.get() < maxRetries) {
+                Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+                Destination destination = session.createQueue(destinationName);
+
+                MessageConsumer consumer = 
session.createConsumer(destination);            
+                TextMessage msg = (TextMessage) consumer.receive(1000);
+                if (msg != null) {
+                    LOG.info("Received message " + msg.getText() + " (" + 
received.getAndIncrement() + ")" + msg.getJMSMessageID());
+                    assertEquals("redelivery property matches deliveries", 
received.get(), msg.getLongProperty("JMSXDeliveryCount"));
+                    session.rollback();
+                }
+                session.close();
+            }
+        }
+    }
+
     public void testRedeliveryPropertyWithNoRollback() throws Exception {
         ConnectionFactory connectionFactory = 
             new ActiveMQConnectionFactory(brokerUrl);

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=739307&r1=739306&r2=739307&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
 Fri Jan 30 15:30:24 2009
@@ -140,7 +140,7 @@
         doSetUp();
         remoteConsumer = remoteSession.createDurableSubscriber(included, 
consumerName);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            assertNotNull(remoteConsumer.receive(500));
+            assertNotNull("message count: " + i, remoteConsumer.receive(1000));
         }
     }
 


Reply via email to