Author: gtully Date: Fri Jan 30 15:30:24 2009 New Revision: 739307 URL: http://svn.apache.org/viewvc?rev=739307&view=rev Log: fix for https://issues.apache.org/activemq/browse/AMQ-1593
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=739307&r1=739306&r2=739307&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Jan 30 15:30:24 2009 @@ -316,7 +316,6 @@ inAckRange = true; } if (inAckRange) { - node.incrementRedeliveryCounter(); if (ack.getLastMessageId().equals(messageId)) { destination = node.getRegionDestination(); callDispatchMatched = true; Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java?rev=739307&r1=739306&r2=739307&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java Fri Jan 30 15:30:24 2009 @@ -70,9 +70,6 @@ public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) throws Exception { - final int nbMessages = 10; - final String destinationName = "Destination"; - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); Connection connection = connectionFactory.createConnection(); @@ -179,36 +176,66 @@ } } - public void testRedeliveryOnSessionCloseWithNoRollback() throws Exception { + // AMQ-1593 + public void testValidateRedeliveryCountOnRollback() throws Exception { - ConnectionFactory connectionFactory = + final int numMessages = 1; + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); Connection connection = connectionFactory.createConnection(); connection.start(); - populateDestination(nbMessages, destinationName, connection); + populateDestination(numMessages, destinationName, connection); { AtomicInteger received = new AtomicInteger(); - Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>(); - while (received.get() < nbMessages) { - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries(); + while (received.get() < maxRetries) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Destination destination = session.createQueue(destinationName); MessageConsumer consumer = session.createConsumer(destination); TextMessage msg = (TextMessage) consumer.receive(1000); if (msg != null) { - if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { - LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); - assertTrue(msg.getJMSRedelivered()); - session.commit(); - } + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount")); + session.rollback(); } session.close(); } } } + // AMQ-1593 + public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception { + + final int numMessages = 1; + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(numMessages, destinationName, connection); + + { + AtomicInteger received = new AtomicInteger(); + final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries(); + while (received.get() < maxRetries) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(1000); + if (msg != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount")); + session.rollback(); + } + session.close(); + } + } + } + public void testRedeliveryPropertyWithNoRollback() throws Exception { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=739307&r1=739306&r2=739307&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original) +++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Fri Jan 30 15:30:24 2009 @@ -140,7 +140,7 @@ doSetUp(); remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName); for (int i = 0; i < MESSAGE_COUNT; i++) { - assertNotNull(remoteConsumer.receive(500)); + assertNotNull("message count: " + i, remoteConsumer.receive(1000)); } }