Repository: activemq Updated Branches: refs/heads/trunk f38cb588d -> a2c5c22ec
https://issues.apache.org/jira/browse/AMQ-5513 - additional test for reconnect/rebalance case - indicate the absense of delivery information to the destination Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2c5c22e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2c5c22e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2c5c22e Branch: refs/heads/trunk Commit: a2c5c22ec5e5f30816f0cf3d0f1d192267f7e25e Parents: f38cb58 Author: gtully <gary.tu...@gmail.com> Authored: Fri Jan 23 15:23:35 2015 +0000 Committer: gtully <gary.tu...@gmail.com> Committed: Fri Jan 23 15:23:35 2015 +0000 ---------------------------------------------------------------------- .../activemq/broker/TransportConnection.java | 2 +- .../org/apache/activemq/JmsRedeliveredTest.java | 38 ++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a2c5c22e/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index a9e4c86..270ed9f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1184,7 +1184,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { cs.getContext().getStopping().set(true); try { LOG.debug("Cleaning up connection resources: {}", getRemoteAddress()); - processRemoveConnection(cs.getInfo().getConnectionId(), 0l); + processRemoveConnection(cs.getInfo().getConnectionId(), -1); } catch (Throwable ignore) { ignore.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/a2c5c22e/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java index 72a1a28..e5d90d6 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java @@ -32,6 +32,8 @@ import javax.jms.Topic; import junit.framework.Test; import junit.framework.TestCase; import junit.framework.TestSuite; +import org.apache.activemq.transport.vm.VMTransport; +import org.apache.activemq.util.Wait; /** * @@ -401,6 +403,42 @@ public class JmsRedeliveredTest extends TestCase { session.close(); } + public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() throws Exception { + connection.setClientID(getName()); + connection.start(); + + Connection keepBrokerAliveConnection = createConnection(); + keepBrokerAliveConnection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + final MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session)); + session.commit(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 1; + } + }); + + // whack the connection - like a rebalance or tcp drop + ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop(); + + session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(queue); + Message msg = messageConsumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + session.close(); + keepBrokerAliveConnection.close(); + } + public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws Exception { connection.setClientID(getName()); connection.start();