This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 4af6f40  AMQ-6494 is related, fix intermittent failure of 
RedeliveryPolicyTest related to vm transport server being shutdown while in use 
via async onException handler
4af6f40 is described below

commit 4af6f4018656e01989136042a8ad4ec1ab64c137
Author: gtully <gary.tu...@gmail.com>
AuthorDate: Thu Oct 3 11:08:05 2019 +0100

    AMQ-6494 is related, fix intermittent failure of RedeliveryPolicyTest 
related to vm transport server being shutdown while in use via async 
onException handler
---
 .../activemq/transport/vm/VMTransportServer.java   |  9 +++--
 .../org/apache/activemq/RedeliveryPolicyTest.java  | 39 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 3 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
 
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
index 2f3d519..8bef1cc 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransportServer.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.vm;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.command.BrokerInfo;
@@ -35,7 +36,7 @@ public class VMTransportServer implements TransportServer {
 
     private TransportAcceptListener acceptListener;
     private final URI location;
-    private boolean disposed;
+    private AtomicBoolean disposed = new AtomicBoolean(false);
 
     private final AtomicInteger connectionCount = new AtomicInteger(0);
     private final boolean disposeOnDisconnect;
@@ -64,7 +65,7 @@ public class VMTransportServer implements TransportServer {
     public VMTransport connect() throws IOException {
         TransportAcceptListener al;
         synchronized (this) {
-            if (disposed) {
+            if (disposed.get()) {
                 throw new IOException("Server has been disposed.");
             }
             al = acceptListener;
@@ -117,7 +118,9 @@ public class VMTransportServer implements TransportServer {
     }
 
     public void stop() throws IOException {
-        VMTransportFactory.stopped(this);
+        if (disposed.compareAndSet(false, true)) {
+            VMTransportFactory.stopped(this);
+        }
     }
 
     public URI getConnectURI() {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
index 5af3a37..a0a1ca8 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java
@@ -600,6 +600,45 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
     }
 
 
+    public void testRepeatedServerClose() throws Exception {
+
+        connection.start();
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        ActiveMQQueue destination = new ActiveMQQueue("TEST");
+        MessageProducer producer = session.createProducer(destination);
+
+        // Send the messages
+        producer.send(session.createTextMessage("1st"));
+        session.commit();
+
+        final int maxRedeliveries = 10000;
+        for (int i=0;i<=maxRedeliveries + 1;i++) {
+
+            final ActiveMQConnection toTest = 
(ActiveMQConnection)factory.createConnection(userName, password);
+            toTest.start();
+
+            // abortive close via broker
+            for (VMTransportServer transportServer : 
VMTransportFactory.SERVERS.values()) {
+                transportServer.stop();
+            }
+
+            Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return toTest.isTransportFailed();
+                }
+            },10000, 100 );
+
+            try {
+                toTest.close();
+            } catch (Exception expected) {
+            } finally {
+            }
+        }
+    }
+
+
+
     public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception {
 
         connection.start();

Reply via email to