ARTEMIS-1490 Race on address delete
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9eb780cb Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9eb780cb Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9eb780cb Branch: refs/heads/master Commit: 9eb780cbd16df6b64f97a21a052a1faefd5b4f2b Parents: f8a4e0c Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Mon Oct 30 17:04:10 2017 -0400 Committer: Justin Bertram <jbert...@apache.org> Committed: Tue Oct 31 08:37:40 2017 -0500 ---------------------------------------------------------------------- .../amqp/broker/AMQPSessionCallback.java | 30 ++++------------ .../tests/integration/client/ConsumerTest.java | 36 ++++++++++++++++++++ 2 files changed, 43 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eb780cb/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index aab414f..19f6351 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -16,9 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; @@ -45,7 +43,6 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPResourceLimitExceededException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; @@ -317,35 +314,22 @@ public class AMQPSessionCallback implements SessionCallback { public void closeSender(final Object brokerConsumer) throws Exception { final ServerConsumer consumer = ((ServerConsumer) brokerConsumer); - final CountDownLatch latch = new CountDownLatch(1); - Runnable runnable = new Runnable() { + serverSession.getSessionContext().executeOnCompletion(new IOCallback() { @Override - public void run() { + public void done() { try { consumer.close(false); - latch.countDown(); } catch (Exception e) { + logger.warn(e.getMessage(), e); } } - }; - - // Due to the nature of proton this could be happening within flushes from the queue-delivery (depending on how it happened on the protocol) - // to avoid deadlocks the close has to be done outside of the main thread on an executor - // otherwise you could get a deadlock - Executor executor = protonSPI.getExeuctor(); - if (executor != null) { - executor.execute(runnable); - } else { - runnable.run(); - } + @Override + public void onError(int errorCode, String errorMessage) { + } + }); - try { - latch.await(10, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new ActiveMQAMQPInternalErrorException("Unable to close consumers for queue: " + consumer.getQueue()); - } } public String tempQueueName() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9eb780cb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java index 5733713..af172c8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java @@ -261,6 +261,42 @@ public class ConsumerTest extends ActiveMQTestBase { } @Test + public void testAutoCreateCOnConsumer() throws Throwable { + + final SimpleString thisQueue = SimpleString.toSimpleString("ThisQueue"); + if (!isNetty()) { + // no need to run the test, there's no AMQP support + return; + } + + for (int i = 0; i < 10; i++) { + ConnectionFactory factorySend = createFactory(2); + Connection connection = factorySend.createConnection(); + + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(thisQueue.toString()); + MessageProducer producer = session.createProducer(queue); + + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + + producer.send(session.createTextMessage("hello")); + + Assert.assertNotNull(consumer.receive(5000)); + consumer.close(); + session.close(); + } finally { + connection.close(); + } + + Wait.waitFor(() -> server.getAddressInfo(thisQueue) == null, 1000, 10); + assertNull(server.getAddressInfo(thisQueue)); + assertEquals(0, server.getTotalMessageCount()); + } + } + + @Test public void testAutoDeleteAutoCreatedAddressAndQueue() throws Throwable { if (!isNetty()) { // no need to run the test, there's no AMQP support