[4/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java -- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index cf2ec59..a0ccfcc 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -52,6 +52,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.QueueAttributes; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -318,7 +319,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (jbd.isQueue() && response.isAutoCreateQueues()) { // perhaps just relying on the broker to do it is simplest (i.e. purgeOnNoConsumers) session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true); - session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers()); + createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } else if (!jbd.isQueue() && response.isAutoCreateAddresses()) { session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true); } else { @@ -629,16 +630,16 @@ public class ActiveMQSession implements QueueSession, TopicSession { queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName); - if (durability == ConsumerDurability.DURABLE) { -try { - session.createSharedQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString, true); -} catch (ActiveMQQueueExistsException ignored) { - // We ignore this because querying and then creating the queue wouldn't be idempotent - // we could also add a parameter to ignore existence what would require a bigger work around to avoid - // compatibility. + try { +if (durability == ConsumerDurability.DURABLE) { + createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); +} else { + createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } - } else { -session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, false); + } catch (ActiveMQQueueExistsException ignored) { +// We ignore this because querying and then creating the queue wouldn't be idempotent +// we could also add a parameter to ignore existence what would require a bigger work around to avoid +// compatibility. } consumer = session.createConsumer(queueName, null, false); @@ -699,7 +700,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) { if (response.isAutoCreateQueues()) { try { - session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers()); + createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } catch (ActiveMQQueueExistsException e) { // The queue was created by another client/admin between the query check and send create queue packet }
[4/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
ARTEMIS-853 Support for exclusive consumers Support exlusive consumer Allow default address level settings for exclusive consumer Allow queue level setting in broker.xml Add the ability to set queue settings via Core JMS using address. Similar to ActiveMQ 5.X Allow for Core JMS client to define exclusive consumer using address parameters Add tests Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47c9a90d Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47c9a90d Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47c9a90d Branch: refs/heads/master Commit: 47c9a90dcb16b70aeef3a09c89400669add083f5 Parents: 56e1df3 Author: Michael André PearceAuthored: Tue Jan 30 17:43:14 2018 + Committer: Michael Pearce Committed: Wed Feb 7 15:19:01 2018 + -- .../artemis/api/core/ParameterisedAddress.java | 117 +++ .../artemis/api/core/QueueAttributes.java | 79 + .../config/ActiveMQDefaultConfiguration.java| 12 + .../artemis/api/core/client/ClientSession.java | 77 + .../core/management/ActiveMQServerControl.java | 16 + .../api/core/management/QueueControl.java | 12 + .../core/client/impl/AddressQueryImpl.java | 20 +- .../core/client/impl/ClientSessionImpl.java | 96 +- .../core/client/impl/QueueQueryImpl.java| 33 ++ .../core/impl/ActiveMQSessionContext.java | 45 ++- .../impl/wireformat/CreateQueueMessage_V2.java | 41 ++- .../wireformat/CreateSharedQueueMessage_V2.java | 75 - .../SessionBindingQueryResponseMessage_V4.java | 35 +- .../SessionQueueQueryResponseMessage_V3.java| 49 ++- .../artemis/core/server/QueueQueryResult.java | 20 +- .../spi/core/remoting/SessionContext.java | 25 ++ .../artemis/jms/client/ActiveMQDestination.java | 28 +- .../jms/client/ActiveMQMessageProducer.java | 26 +- .../artemis/jms/client/ActiveMQSession.java | 88 - .../jms/client/ActiveMQParameterTest.java | 46 +++ .../client/HornetQClientSessionContext.java | 2 +- .../core/config/CoreQueueConfiguration.java | 36 +++ .../deployers/impl/FileConfigurationParser.java | 19 +- .../impl/ActiveMQServerControlImpl.java | 15 +- .../core/management/impl/QueueControlImpl.java | 24 ++ .../core/management/impl/view/QueueView.java| 8 +- .../view/predicate/QueueFilterPredicate.java| 6 +- .../core/persistence/QueueBindingInfo.java | 4 + .../journal/AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 26 +- .../artemis/core/postoffice/PostOffice.java | 3 +- .../core/postoffice/impl/PostOfficeImpl.java| 7 +- .../core/ServerSessionPacketHandler.java| 8 +- .../artemis/core/server/ActiveMQServer.java | 17 + .../artemis/core/server/BindingQueryResult.java | 20 +- .../activemq/artemis/core/server/Queue.java | 6 + .../artemis/core/server/QueueConfig.java| 39 ++- .../artemis/core/server/ServerSession.java | 31 ++ .../core/server/impl/ActiveMQServerImpl.java| 78 - .../core/server/impl/LastValueQueue.java| 8 +- .../server/impl/PostOfficeJournalLoader.java| 1 + .../core/server/impl/QueueFactoryImpl.java | 11 +- .../artemis/core/server/impl/QueueImpl.java | 50 ++- .../core/server/impl/ServerSessionImpl.java | 71 - .../core/settings/impl/AddressSettings.java | 60 +++- .../resources/schema/artemis-configuration.xsd | 22 +- .../impl/ScheduledDeliveryHandlerTest.java | 15 + .../test/resources/artemis-configuration.xsd| 23 +- .../tests/integration/amqp/JMSLVQTest.java | 2 +- .../client/MultipleProducersTest.java | 2 +- .../client/SessionCreateAndDeleteQueueTest.java | 2 +- .../integration/client/UpdateQueueTest.java | 2 +- .../integration/jms/client/ExclusiveTest.java | 319 +++ .../tests/integration/jms/client/LVQTest.java | 145 + .../ActiveMQServerControlUsingCoreTest.java | 10 + .../management/QueueControlUsingCoreTest.java | 10 + .../integration/server/LVQRecoveryTest.java | 4 +- .../tests/integration/server/LVQTest.java | 2 +- .../jms/tests/message/MessageHeaderTest.java| 27 ++ .../unit/core/postoffice/impl/FakeQueue.java| 17 + .../core/server/impl/fakes/FakePostOffice.java | 3 +- 61 files changed, 1967 insertions(+), 130 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java -- diff --git