Repository: activemq
Updated Branches:
  refs/heads/trunk f38cb588d -> a2c5c22ec


https://issues.apache.org/jira/browse/AMQ-5513 - additional test for 
reconnect/rebalance case - indicate the absense of delivery information to the 
destination


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2c5c22e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2c5c22e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2c5c22e

Branch: refs/heads/trunk
Commit: a2c5c22ec5e5f30816f0cf3d0f1d192267f7e25e
Parents: f38cb58
Author: gtully <gary.tu...@gmail.com>
Authored: Fri Jan 23 15:23:35 2015 +0000
Committer: gtully <gary.tu...@gmail.com>
Committed: Fri Jan 23 15:23:35 2015 +0000

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  2 +-
 .../org/apache/activemq/JmsRedeliveredTest.java | 38 ++++++++++++++++++++
 2 files changed, 39 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a2c5c22e/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index a9e4c86..270ed9f 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1184,7 +1184,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 cs.getContext().getStopping().set(true);
                 try {
                     LOG.debug("Cleaning up connection resources: {}", 
getRemoteAddress());
-                    processRemoveConnection(cs.getInfo().getConnectionId(), 
0l);
+                    processRemoveConnection(cs.getInfo().getConnectionId(), 
-1);
                 } catch (Throwable ignore) {
                     ignore.printStackTrace();
                 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a2c5c22e/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
index 72a1a28..e5d90d6 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java
@@ -32,6 +32,8 @@ import javax.jms.Topic;
 import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
+import org.apache.activemq.transport.vm.VMTransport;
+import org.apache.activemq.util.Wait;
 
 /**
  * 
@@ -401,6 +403,42 @@ public class JmsRedeliveredTest extends TestCase {
         session.close();
     }
 
+    public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() 
throws Exception {
+        connection.setClientID(getName());
+        connection.start();
+
+        Connection keepBrokerAliveConnection = createConnection();
+        keepBrokerAliveConnection.start();
+
+        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue("queue-" + getName());
+        final MessageConsumer consumer = session.createConsumer(queue);
+
+        MessageProducer producer = createProducer(session, queue);
+        producer.send(createTextMessage(session));
+        session.commit();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 
1;
+            }
+        });
+
+        // whack the connection - like a rebalance or tcp drop
+        
((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop();
+
+        session = keepBrokerAliveConnection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer messageConsumer = session.createConsumer(queue);
+        Message msg = messageConsumer.receive(1000);
+        assertNotNull(msg);
+        msg.acknowledge();
+
+        assertFalse("Message should not be redelivered.", 
msg.getJMSRedelivered());
+        session.close();
+        keepBrokerAliveConnection.close();
+    }
+
     public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws 
Exception {
         connection.setClientID(getName());
         connection.start();

Reply via email to