ARTEMIS-1490 Race on address delete

Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9eb780cb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9eb780cb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9eb780cb

Branch: refs/heads/master
Commit: 9eb780cbd16df6b64f97a21a052a1faefd5b4f2b
Parents: f8a4e0c
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Mon Oct 30 17:04:10 2017 -0400
Committer: Justin Bertram <jbert...@apache.org>
Committed: Tue Oct 31 08:37:40 2017 -0500

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 30 ++++------------
 .../tests/integration/client/ConsumerTest.java  | 36 ++++++++++++++++++++
 2 files changed, 43 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eb780cb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index aab414f..19f6351 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -16,9 +16,7 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
@@ -45,7 +43,6 @@ import 
org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
-import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException;
 import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
@@ -317,35 +314,22 @@ public class AMQPSessionCallback implements 
SessionCallback {
    public void closeSender(final Object brokerConsumer) throws Exception {
 
       final ServerConsumer consumer = ((ServerConsumer) brokerConsumer);
-      final CountDownLatch latch = new CountDownLatch(1);
 
-      Runnable runnable = new Runnable() {
+      serverSession.getSessionContext().executeOnCompletion(new IOCallback() {
          @Override
-         public void run() {
+         public void done() {
             try {
                consumer.close(false);
-               latch.countDown();
             } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
             }
          }
-      };
-
-      // Due to the nature of proton this could be happening within flushes 
from the queue-delivery (depending on how it happened on the protocol)
-      // to avoid deadlocks the close has to be done outside of the main 
thread on an executor
-      // otherwise you could get a deadlock
-      Executor executor = protonSPI.getExeuctor();
 
-      if (executor != null) {
-         executor.execute(runnable);
-      } else {
-         runnable.run();
-      }
+         @Override
+         public void onError(int errorCode, String errorMessage) {
+         }
+      });
 
-      try {
-         latch.await(10, TimeUnit.SECONDS);
-      } catch (InterruptedException e) {
-         throw new ActiveMQAMQPInternalErrorException("Unable to close 
consumers for queue: " + consumer.getQueue());
-      }
    }
 
    public String tempQueueName() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eb780cb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index 5733713..af172c8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -261,6 +261,42 @@ public class ConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testAutoCreateCOnConsumer() throws Throwable {
+
+      final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue");
+      if (!isNetty()) {
+         // no need to run the test, there's no AMQP support
+         return;
+      }
+
+      for (int i = 0; i < 10; i++) {
+         ConnectionFactory factorySend = createFactory(2);
+         Connection connection = factorySend.createConnection();
+
+         try {
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            javax.jms.Queue queue = session.createQueue(thisQueue.toString());
+            MessageProducer producer = session.createProducer(queue);
+
+            MessageConsumer consumer = session.createConsumer(queue);
+            connection.start();
+
+            producer.send(session.createTextMessage("hello"));
+
+            Assert.assertNotNull(consumer.receive(5000));
+            consumer.close();
+            session.close();
+         } finally {
+            connection.close();
+         }
+
+         Wait.waitFor(() -> server.getAddressInfo(thisQueue) == null, 1000, 
10);
+         assertNull(server.getAddressInfo(thisQueue));
+         assertEquals(0, server.getTotalMessageCount());
+      }
+   }
+
+   @Test
    public void testAutoDeleteAutoCreatedAddressAndQueue() throws Throwable {
       if (!isNetty()) {
          // no need to run the test, there's no AMQP support

Reply via email to