[4/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java -- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index cf2ec59..a0ccfcc 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -52,6 +52,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.QueueAttributes; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -318,7 +319,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (jbd.isQueue() && response.isAutoCreateQueues()) { // perhaps just relying on the broker to do it is simplest (i.e. purgeOnNoConsumers) session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true); - session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers()); + createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } else if (!jbd.isQueue() && response.isAutoCreateAddresses()) { session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true); } else { @@ -629,16 +630,16 @@ public class ActiveMQSession implements QueueSession, TopicSession { queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName); - if (durability == ConsumerDurability.DURABLE) { -try { - session.createSharedQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString, true); -} catch (ActiveMQQueueExistsException ignored) { - // We ignore this because querying and then creating the queue wouldn't be idempotent - // we could also add a parameter to ignore existence what would require a bigger work around to avoid - // compatibility. + try { +if (durability == ConsumerDurability.DURABLE) { + createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); +} else { + createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } - } else { -session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, false); + } catch (ActiveMQQueueExistsException ignored) { +// We ignore this because querying and then creating the queue wouldn't be idempotent +// we could also add a parameter to ignore existence what would require a bigger work around to avoid +// compatibility. } consumer = session.createConsumer(queueName, null, false); @@ -699,7 +700,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) { if (response.isAutoCreateQueues()) { try { - session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers()); + createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } catch (ActiveMQQueueExistsException e) { // The queue was created by another client/admin between the query check and send create queue packet }
[1/5] activemq-artemis git commit: This closes #1832
Repository: activemq-artemis Updated Branches: refs/heads/master 56e1df3c3 -> 4883b0360 This closes #1832 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4883b036 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4883b036 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4883b036 Branch: refs/heads/master Commit: 4883b0360ed54e950d4cda9fcfd8c5be3cf88cc6 Parents: 56e1df3 38c45c9 Author: Michael Andre PearceAuthored: Wed Feb 7 15:27:29 2018 + Committer: Michael Andre Pearce Committed: Wed Feb 7 15:27:29 2018 + -- .../artemis/api/core/ParameterisedAddress.java | 98 ++ .../artemis/api/core/QueueAttributes.java | 79 + .../activemq/artemis/utils/uri/URISupport.java | 32 +- .../config/ActiveMQDefaultConfiguration.java| 12 + .../artemis/api/core/client/ClientSession.java | 77 + .../core/management/ActiveMQServerControl.java | 16 + .../api/core/management/QueueControl.java | 12 + .../core/client/impl/AddressQueryImpl.java | 20 +- .../core/client/impl/ClientSessionImpl.java | 96 +- .../core/client/impl/QueueQueryImpl.java| 33 ++ .../core/impl/ActiveMQSessionContext.java | 45 ++- .../impl/wireformat/CreateQueueMessage_V2.java | 41 ++- .../wireformat/CreateSharedQueueMessage_V2.java | 75 - .../SessionBindingQueryResponseMessage_V4.java | 35 +- .../SessionQueueQueryResponseMessage_V3.java| 49 ++- .../artemis/core/server/QueueQueryResult.java | 20 +- .../spi/core/remoting/SessionContext.java | 25 ++ .../artemis/jms/client/ActiveMQDestination.java | 28 +- .../jms/client/ActiveMQMessageProducer.java | 26 +- .../artemis/jms/client/ActiveMQSession.java | 88 - .../jms/client/ActiveMQParameterTest.java | 63 .../client/HornetQClientSessionContext.java | 2 +- .../core/config/CoreQueueConfiguration.java | 36 +++ .../deployers/impl/FileConfigurationParser.java | 19 +- .../impl/ActiveMQServerControlImpl.java | 15 +- .../core/management/impl/QueueControlImpl.java | 24 ++ .../core/management/impl/view/QueueView.java| 8 +- .../view/predicate/QueueFilterPredicate.java| 6 +- .../core/persistence/QueueBindingInfo.java | 4 + .../journal/AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 26 +- .../artemis/core/postoffice/PostOffice.java | 3 +- .../core/postoffice/impl/PostOfficeImpl.java| 7 +- .../core/ServerSessionPacketHandler.java| 8 +- .../artemis/core/server/ActiveMQServer.java | 17 + .../artemis/core/server/BindingQueryResult.java | 20 +- .../activemq/artemis/core/server/Queue.java | 6 + .../artemis/core/server/QueueConfig.java| 39 ++- .../artemis/core/server/ServerSession.java | 31 ++ .../core/server/impl/ActiveMQServerImpl.java| 78 - .../core/server/impl/LastValueQueue.java| 8 +- .../server/impl/PostOfficeJournalLoader.java| 1 + .../core/server/impl/QueueFactoryImpl.java | 11 +- .../artemis/core/server/impl/QueueImpl.java | 50 ++- .../core/server/impl/ServerSessionImpl.java | 71 - .../core/settings/impl/AddressSettings.java | 60 +++- .../resources/schema/artemis-configuration.xsd | 22 +- .../impl/ScheduledDeliveryHandlerTest.java | 15 + .../test/resources/artemis-configuration.xsd| 23 +- .../tests/integration/amqp/JMSLVQTest.java | 2 +- .../client/MultipleProducersTest.java | 2 +- .../client/SessionCreateAndDeleteQueueTest.java | 2 +- .../integration/client/UpdateQueueTest.java | 2 +- .../integration/jms/client/ExclusiveTest.java | 319 +++ .../tests/integration/jms/client/LVQTest.java | 145 + .../ActiveMQServerControlUsingCoreTest.java | 10 + .../management/QueueControlUsingCoreTest.java | 10 + .../integration/server/LVQRecoveryTest.java | 4 +- .../tests/integration/server/LVQTest.java | 2 +- .../jms/tests/message/MessageHeaderTest.java| 27 ++ .../unit/core/postoffice/impl/FakeQueue.java| 17 + .../core/server/impl/fakes/FakePostOffice.java | 3 +- 62 files changed, 1988 insertions(+), 139 deletions(-) --
[3/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java -- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index a3e817b..55125bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -559,7 +559,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception { AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString()); - return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), false); + return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), false); } public Queue createQueue(final AddressInfo addressInfo, @@ -569,6 +569,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean durable, final int maxConsumers, final boolean purgeOnNoConsumers, +final boolean exclusive, +final boolean lastValue, final boolean autoCreated) throws Exception { final SimpleString unPrefixedName = removePrefix(name); @@ -583,7 +585,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.checkQueueCreationLimit(getUsername()); - Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses()); + Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses()); if (temporary) { // Temporary queue in core simply means the queue will be deleted if @@ -622,7 +624,30 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final int maxConsumers, final boolean purgeOnNoConsumers, final boolean autoCreated) throws Exception { - return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, autoCreated); + AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), autoCreated); + } + + @Override + public Queue createQueue(final SimpleString address, +final SimpleString name, +final RoutingType routingType, +final SimpleString filterString, +final boolean temporary, +final boolean durable, +final int maxConsumers, +final boolean purgeOnNoConsumers, +final Boolean exclusive, +final Boolean lastValue, +final boolean autoCreated) throws Exception { + if (exclusive == null || lastValue == null) { + AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, +exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, lastValue == null ? as.isDefaultLastValueQueue() : lastValue, autoCreated); + } else { + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, +exclusive, lastValue, autoCreated); + } }
[5/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
ARTEMIS-853 Support for exclusive consumers Support exlusive consumer Allow default address level settings for exclusive consumer Allow queue level setting in broker.xml Add the ability to set queue settings via Core JMS using address. Similar to ActiveMQ 5.X Allow for Core JMS client to define exclusive consumer using address parameters Add tests Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/dc41f3ca Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/dc41f3ca Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/dc41f3ca Branch: refs/heads/master Commit: dc41f3ca491e96e199290a225fdaa07ac05d66df Parents: 56e1df3 Author: Michael André PearceAuthored: Tue Jan 30 17:43:14 2018 + Committer: Michael Andre Pearce Committed: Wed Feb 7 15:27:29 2018 + -- .../artemis/api/core/ParameterisedAddress.java | 117 +++ .../artemis/api/core/QueueAttributes.java | 79 + .../config/ActiveMQDefaultConfiguration.java| 12 + .../artemis/api/core/client/ClientSession.java | 77 + .../core/management/ActiveMQServerControl.java | 16 + .../api/core/management/QueueControl.java | 12 + .../core/client/impl/AddressQueryImpl.java | 20 +- .../core/client/impl/ClientSessionImpl.java | 96 +- .../core/client/impl/QueueQueryImpl.java| 33 ++ .../core/impl/ActiveMQSessionContext.java | 45 ++- .../impl/wireformat/CreateQueueMessage_V2.java | 41 ++- .../wireformat/CreateSharedQueueMessage_V2.java | 75 - .../SessionBindingQueryResponseMessage_V4.java | 35 +- .../SessionQueueQueryResponseMessage_V3.java| 49 ++- .../artemis/core/server/QueueQueryResult.java | 20 +- .../spi/core/remoting/SessionContext.java | 25 ++ .../artemis/jms/client/ActiveMQDestination.java | 28 +- .../jms/client/ActiveMQMessageProducer.java | 26 +- .../artemis/jms/client/ActiveMQSession.java | 88 - .../jms/client/ActiveMQParameterTest.java | 46 +++ .../client/HornetQClientSessionContext.java | 2 +- .../core/config/CoreQueueConfiguration.java | 36 +++ .../deployers/impl/FileConfigurationParser.java | 19 +- .../impl/ActiveMQServerControlImpl.java | 15 +- .../core/management/impl/QueueControlImpl.java | 24 ++ .../core/management/impl/view/QueueView.java| 8 +- .../view/predicate/QueueFilterPredicate.java| 6 +- .../core/persistence/QueueBindingInfo.java | 4 + .../journal/AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 26 +- .../artemis/core/postoffice/PostOffice.java | 3 +- .../core/postoffice/impl/PostOfficeImpl.java| 7 +- .../core/ServerSessionPacketHandler.java| 8 +- .../artemis/core/server/ActiveMQServer.java | 17 + .../artemis/core/server/BindingQueryResult.java | 20 +- .../activemq/artemis/core/server/Queue.java | 6 + .../artemis/core/server/QueueConfig.java| 39 ++- .../artemis/core/server/ServerSession.java | 31 ++ .../core/server/impl/ActiveMQServerImpl.java| 78 - .../core/server/impl/LastValueQueue.java| 8 +- .../server/impl/PostOfficeJournalLoader.java| 1 + .../core/server/impl/QueueFactoryImpl.java | 11 +- .../artemis/core/server/impl/QueueImpl.java | 50 ++- .../core/server/impl/ServerSessionImpl.java | 71 - .../core/settings/impl/AddressSettings.java | 60 +++- .../resources/schema/artemis-configuration.xsd | 22 +- .../impl/ScheduledDeliveryHandlerTest.java | 15 + .../test/resources/artemis-configuration.xsd| 23 +- .../tests/integration/amqp/JMSLVQTest.java | 2 +- .../client/MultipleProducersTest.java | 2 +- .../client/SessionCreateAndDeleteQueueTest.java | 2 +- .../integration/client/UpdateQueueTest.java | 2 +- .../integration/jms/client/ExclusiveTest.java | 319 +++ .../tests/integration/jms/client/LVQTest.java | 145 + .../ActiveMQServerControlUsingCoreTest.java | 10 + .../management/QueueControlUsingCoreTest.java | 10 + .../integration/server/LVQRecoveryTest.java | 4 +- .../tests/integration/server/LVQTest.java | 2 +- .../jms/tests/message/MessageHeaderTest.java| 27 ++ .../unit/core/postoffice/impl/FakeQueue.java| 17 + .../core/server/impl/fakes/FakePostOffice.java | 3 +- 61 files changed, 1967 insertions(+), 130 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/dc41f3ca/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java -- diff --git
[2/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
ARTEMIS-853 Support for exclusive consumers Rationalise and re-use URISupport. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/38c45c92 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/38c45c92 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/38c45c92 Branch: refs/heads/master Commit: 38c45c92142cd6a3b9f49f2d436d7a0665309c00 Parents: dc41f3c Author: Michael André PearceAuthored: Fri Feb 2 18:19:40 2018 + Committer: Michael Andre Pearce Committed: Wed Feb 7 15:27:29 2018 + -- .../artemis/api/core/ParameterisedAddress.java | 55 +++- .../activemq/artemis/utils/uri/URISupport.java | 32 .../jms/client/ActiveMQParameterTest.java | 17 ++ 3 files changed, 58 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/38c45c92/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java -- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java index bbc3c4d..6a6d45c 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java @@ -16,46 +16,32 @@ */ package org.apache.activemq.artemis.api.core; +import static org.apache.activemq.artemis.utils.uri.URISupport.appendParameters; +import static org.apache.activemq.artemis.utils.uri.URISupport.parseQuery; + +import java.net.URISyntaxException; import java.util.Map; +import org.apache.activemq.artemis.utils.uri.URISupport; + public class ParameterisedAddress { - public static SimpleString toParameterisedAddress(SimpleString address, Map parameters) { - if (parameters != null && parameters.size() > 0) { + public static SimpleString toParameterisedAddress(SimpleString address, Map parameters) throws URISyntaxException { + if (parameters != null && !parameters.isEmpty()) { return SimpleString.toSimpleString(toParameterisedAddress(address.toString(), parameters)); } else { return address; } } - public static String toParameterisedAddress(String address, Map parameters) { - if (parameters != null && parameters.size() > 0) { - StringBuilder stringBuilder = new StringBuilder(address).append(PARAMETER_MARKER); - return toParameterString(stringBuilder, parameters).toString(); + public static String toParameterisedAddress(String address, Map parameters) throws URISyntaxException { + if (parameters != null && !parameters.isEmpty()) { + return appendParameters(new StringBuilder(address), parameters).toString(); } else { return address; } } - private static StringBuilder toParameterString(StringBuilder stringBuilder, Map parameters) { - boolean first = true; - for (Map.Entry entry : parameters.entrySet()) { - if (first) { -first = false; - } else { -stringBuilder.append(PARAMETER_SEPERATOR); - } - stringBuilder.append(entry.getKey()).append(PARAMETER_KEY_VALUE_SEPERATOR).append(entry.getValue()); - } - return stringBuilder; - } - - public static char PARAMETER_SEPERATOR = '&'; - public static char PARAMETER_KEY_VALUE_SEPERATOR = '='; - public static char PARAMETER_MARKER = '?'; - public static String PARAMETER_SEPERATOR_STRING = Character.toString(PARAMETER_SEPERATOR); - public static String PARAMETER_KEY_VALUE_SEPERATOR_STRING = Character.toString(PARAMETER_KEY_VALUE_SEPERATOR); - public static String PARAMETER_MARKER_STRING = Character.toString(PARAMETER_MARKER); private final SimpleString address; private final QueueAttributes queueAttributes; @@ -81,22 +67,17 @@ public class ParameterisedAddress { } public ParameterisedAddress(String address) { - int index = address.indexOf(PARAMETER_MARKER); + int index = address.indexOf('?'); if (index == -1) { this.address = SimpleString.toSimpleString(address); this.queueAttributes = null; } else { this.address = SimpleString.toSimpleString(address.substring(0, index)); - String parametersString = address.substring(index + 1, address.length()); - String[] parameterPairs = parametersString.split(PARAMETER_SEPERATOR_STRING);
[activemq-artemis] Git Push Summary [forced push!] [Forced Update!]
Repository: activemq-artemis Updated Branches: refs/heads/master cd90bb895 -> 56e1df3c3 (forced update)
[4/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
ARTEMIS-853 Support for exclusive consumers Support exlusive consumer Allow default address level settings for exclusive consumer Allow queue level setting in broker.xml Add the ability to set queue settings via Core JMS using address. Similar to ActiveMQ 5.X Allow for Core JMS client to define exclusive consumer using address parameters Add tests Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47c9a90d Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47c9a90d Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47c9a90d Branch: refs/heads/master Commit: 47c9a90dcb16b70aeef3a09c89400669add083f5 Parents: 56e1df3 Author: Michael André PearceAuthored: Tue Jan 30 17:43:14 2018 + Committer: Michael Pearce Committed: Wed Feb 7 15:19:01 2018 + -- .../artemis/api/core/ParameterisedAddress.java | 117 +++ .../artemis/api/core/QueueAttributes.java | 79 + .../config/ActiveMQDefaultConfiguration.java| 12 + .../artemis/api/core/client/ClientSession.java | 77 + .../core/management/ActiveMQServerControl.java | 16 + .../api/core/management/QueueControl.java | 12 + .../core/client/impl/AddressQueryImpl.java | 20 +- .../core/client/impl/ClientSessionImpl.java | 96 +- .../core/client/impl/QueueQueryImpl.java| 33 ++ .../core/impl/ActiveMQSessionContext.java | 45 ++- .../impl/wireformat/CreateQueueMessage_V2.java | 41 ++- .../wireformat/CreateSharedQueueMessage_V2.java | 75 - .../SessionBindingQueryResponseMessage_V4.java | 35 +- .../SessionQueueQueryResponseMessage_V3.java| 49 ++- .../artemis/core/server/QueueQueryResult.java | 20 +- .../spi/core/remoting/SessionContext.java | 25 ++ .../artemis/jms/client/ActiveMQDestination.java | 28 +- .../jms/client/ActiveMQMessageProducer.java | 26 +- .../artemis/jms/client/ActiveMQSession.java | 88 - .../jms/client/ActiveMQParameterTest.java | 46 +++ .../client/HornetQClientSessionContext.java | 2 +- .../core/config/CoreQueueConfiguration.java | 36 +++ .../deployers/impl/FileConfigurationParser.java | 19 +- .../impl/ActiveMQServerControlImpl.java | 15 +- .../core/management/impl/QueueControlImpl.java | 24 ++ .../core/management/impl/view/QueueView.java| 8 +- .../view/predicate/QueueFilterPredicate.java| 6 +- .../core/persistence/QueueBindingInfo.java | 4 + .../journal/AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 26 +- .../artemis/core/postoffice/PostOffice.java | 3 +- .../core/postoffice/impl/PostOfficeImpl.java| 7 +- .../core/ServerSessionPacketHandler.java| 8 +- .../artemis/core/server/ActiveMQServer.java | 17 + .../artemis/core/server/BindingQueryResult.java | 20 +- .../activemq/artemis/core/server/Queue.java | 6 + .../artemis/core/server/QueueConfig.java| 39 ++- .../artemis/core/server/ServerSession.java | 31 ++ .../core/server/impl/ActiveMQServerImpl.java| 78 - .../core/server/impl/LastValueQueue.java| 8 +- .../server/impl/PostOfficeJournalLoader.java| 1 + .../core/server/impl/QueueFactoryImpl.java | 11 +- .../artemis/core/server/impl/QueueImpl.java | 50 ++- .../core/server/impl/ServerSessionImpl.java | 71 - .../core/settings/impl/AddressSettings.java | 60 +++- .../resources/schema/artemis-configuration.xsd | 22 +- .../impl/ScheduledDeliveryHandlerTest.java | 15 + .../test/resources/artemis-configuration.xsd| 23 +- .../tests/integration/amqp/JMSLVQTest.java | 2 +- .../client/MultipleProducersTest.java | 2 +- .../client/SessionCreateAndDeleteQueueTest.java | 2 +- .../integration/client/UpdateQueueTest.java | 2 +- .../integration/jms/client/ExclusiveTest.java | 319 +++ .../tests/integration/jms/client/LVQTest.java | 145 + .../ActiveMQServerControlUsingCoreTest.java | 10 + .../management/QueueControlUsingCoreTest.java | 10 + .../integration/server/LVQRecoveryTest.java | 4 +- .../tests/integration/server/LVQTest.java | 2 +- .../jms/tests/message/MessageHeaderTest.java| 27 ++ .../unit/core/postoffice/impl/FakeQueue.java| 17 + .../core/server/impl/fakes/FakePostOffice.java | 3 +- 61 files changed, 1967 insertions(+), 130 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java -- diff --git
[2/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java -- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index a3e817b..55125bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -559,7 +559,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception { AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString()); - return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), false); + return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), false); } public Queue createQueue(final AddressInfo addressInfo, @@ -569,6 +569,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final boolean durable, final int maxConsumers, final boolean purgeOnNoConsumers, +final boolean exclusive, +final boolean lastValue, final boolean autoCreated) throws Exception { final SimpleString unPrefixedName = removePrefix(name); @@ -583,7 +585,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { server.checkQueueCreationLimit(getUsername()); - Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses()); + Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, server.getAddressSettingsRepository().getMatch(art.getName().toString()).isAutoCreateAddresses()); if (temporary) { // Temporary queue in core simply means the queue will be deleted if @@ -622,7 +624,30 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final int maxConsumers, final boolean purgeOnNoConsumers, final boolean autoCreated) throws Exception { - return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, autoCreated); + AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), autoCreated); + } + + @Override + public Queue createQueue(final SimpleString address, +final SimpleString name, +final RoutingType routingType, +final SimpleString filterString, +final boolean temporary, +final boolean durable, +final int maxConsumers, +final boolean purgeOnNoConsumers, +final Boolean exclusive, +final Boolean lastValue, +final boolean autoCreated) throws Exception { + if (exclusive == null || lastValue == null) { + AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString()); + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, +exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, lastValue == null ? as.isDefaultLastValueQueue() : lastValue, autoCreated); + } else { + return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, +exclusive, lastValue, autoCreated); + } }
[5/5] activemq-artemis git commit: This closes #1832
This closes #1832 Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cd90bb89 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cd90bb89 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cd90bb89 Branch: refs/heads/master Commit: cd90bb895f29da294ca01271174e6b977098ea03 Parents: 56e1df3 5dae8ca Author: Michael PearceAuthored: Wed Feb 7 15:19:02 2018 + Committer: Michael Pearce Committed: Wed Feb 7 15:19:02 2018 + -- .../artemis/api/core/ParameterisedAddress.java | 98 ++ .../artemis/api/core/QueueAttributes.java | 79 + .../activemq/artemis/utils/uri/URISupport.java | 32 +- .../config/ActiveMQDefaultConfiguration.java| 12 + .../artemis/api/core/client/ClientSession.java | 77 + .../core/management/ActiveMQServerControl.java | 16 + .../api/core/management/QueueControl.java | 12 + .../core/client/impl/AddressQueryImpl.java | 20 +- .../core/client/impl/ClientSessionImpl.java | 96 +- .../core/client/impl/QueueQueryImpl.java| 33 ++ .../core/impl/ActiveMQSessionContext.java | 45 ++- .../impl/wireformat/CreateQueueMessage_V2.java | 41 ++- .../wireformat/CreateSharedQueueMessage_V2.java | 75 - .../SessionBindingQueryResponseMessage_V4.java | 35 +- .../SessionQueueQueryResponseMessage_V3.java| 49 ++- .../artemis/core/server/QueueQueryResult.java | 20 +- .../spi/core/remoting/SessionContext.java | 25 ++ .../artemis/jms/client/ActiveMQDestination.java | 28 +- .../jms/client/ActiveMQMessageProducer.java | 26 +- .../artemis/jms/client/ActiveMQSession.java | 88 - .../jms/client/ActiveMQParameterTest.java | 63 .../client/HornetQClientSessionContext.java | 2 +- .../core/config/CoreQueueConfiguration.java | 36 +++ .../deployers/impl/FileConfigurationParser.java | 19 +- .../impl/ActiveMQServerControlImpl.java | 15 +- .../core/management/impl/QueueControlImpl.java | 24 ++ .../core/management/impl/view/QueueView.java| 8 +- .../view/predicate/QueueFilterPredicate.java| 6 +- .../core/persistence/QueueBindingInfo.java | 4 + .../journal/AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 26 +- .../artemis/core/postoffice/PostOffice.java | 3 +- .../core/postoffice/impl/PostOfficeImpl.java| 7 +- .../core/ServerSessionPacketHandler.java| 8 +- .../artemis/core/server/ActiveMQServer.java | 17 + .../artemis/core/server/BindingQueryResult.java | 20 +- .../activemq/artemis/core/server/Queue.java | 6 + .../artemis/core/server/QueueConfig.java| 39 ++- .../artemis/core/server/ServerSession.java | 31 ++ .../core/server/impl/ActiveMQServerImpl.java| 78 - .../core/server/impl/LastValueQueue.java| 8 +- .../server/impl/PostOfficeJournalLoader.java| 1 + .../core/server/impl/QueueFactoryImpl.java | 11 +- .../artemis/core/server/impl/QueueImpl.java | 50 ++- .../core/server/impl/ServerSessionImpl.java | 71 - .../core/settings/impl/AddressSettings.java | 60 +++- .../resources/schema/artemis-configuration.xsd | 22 +- .../impl/ScheduledDeliveryHandlerTest.java | 15 + .../test/resources/artemis-configuration.xsd| 23 +- .../tests/integration/amqp/JMSLVQTest.java | 2 +- .../client/MultipleProducersTest.java | 2 +- .../client/SessionCreateAndDeleteQueueTest.java | 2 +- .../integration/client/UpdateQueueTest.java | 2 +- .../integration/jms/client/ExclusiveTest.java | 319 +++ .../tests/integration/jms/client/LVQTest.java | 145 + .../ActiveMQServerControlUsingCoreTest.java | 10 + .../management/QueueControlUsingCoreTest.java | 10 + .../integration/server/LVQRecoveryTest.java | 4 +- .../tests/integration/server/LVQTest.java | 2 +- .../jms/tests/message/MessageHeaderTest.java| 27 ++ .../unit/core/postoffice/impl/FakeQueue.java| 17 + .../core/server/impl/fakes/FakePostOffice.java | 3 +- 62 files changed, 1988 insertions(+), 139 deletions(-) --
[1/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
Repository: activemq-artemis Updated Branches: refs/heads/master 56e1df3c3 -> cd90bb895 ARTEMIS-853 Support for exclusive consumers Rationalise and re-use URISupport. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5dae8ca9 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5dae8ca9 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5dae8ca9 Branch: refs/heads/master Commit: 5dae8ca941d94bb6ae6b7f20876b8afbf7cc953f Parents: 47c9a90 Author: Michael André PearceAuthored: Fri Feb 2 18:19:40 2018 + Committer: Michael Pearce Committed: Wed Feb 7 15:19:01 2018 + -- .../artemis/api/core/ParameterisedAddress.java | 55 +++- .../activemq/artemis/utils/uri/URISupport.java | 32 .../jms/client/ActiveMQParameterTest.java | 17 ++ 3 files changed, 58 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5dae8ca9/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java -- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java index bbc3c4d..6a6d45c 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java @@ -16,46 +16,32 @@ */ package org.apache.activemq.artemis.api.core; +import static org.apache.activemq.artemis.utils.uri.URISupport.appendParameters; +import static org.apache.activemq.artemis.utils.uri.URISupport.parseQuery; + +import java.net.URISyntaxException; import java.util.Map; +import org.apache.activemq.artemis.utils.uri.URISupport; + public class ParameterisedAddress { - public static SimpleString toParameterisedAddress(SimpleString address, Map parameters) { - if (parameters != null && parameters.size() > 0) { + public static SimpleString toParameterisedAddress(SimpleString address, Map parameters) throws URISyntaxException { + if (parameters != null && !parameters.isEmpty()) { return SimpleString.toSimpleString(toParameterisedAddress(address.toString(), parameters)); } else { return address; } } - public static String toParameterisedAddress(String address, Map parameters) { - if (parameters != null && parameters.size() > 0) { - StringBuilder stringBuilder = new StringBuilder(address).append(PARAMETER_MARKER); - return toParameterString(stringBuilder, parameters).toString(); + public static String toParameterisedAddress(String address, Map parameters) throws URISyntaxException { + if (parameters != null && !parameters.isEmpty()) { + return appendParameters(new StringBuilder(address), parameters).toString(); } else { return address; } } - private static StringBuilder toParameterString(StringBuilder stringBuilder, Map parameters) { - boolean first = true; - for (Map.Entry entry : parameters.entrySet()) { - if (first) { -first = false; - } else { -stringBuilder.append(PARAMETER_SEPERATOR); - } - stringBuilder.append(entry.getKey()).append(PARAMETER_KEY_VALUE_SEPERATOR).append(entry.getValue()); - } - return stringBuilder; - } - - public static char PARAMETER_SEPERATOR = '&'; - public static char PARAMETER_KEY_VALUE_SEPERATOR = '='; - public static char PARAMETER_MARKER = '?'; - public static String PARAMETER_SEPERATOR_STRING = Character.toString(PARAMETER_SEPERATOR); - public static String PARAMETER_KEY_VALUE_SEPERATOR_STRING = Character.toString(PARAMETER_KEY_VALUE_SEPERATOR); - public static String PARAMETER_MARKER_STRING = Character.toString(PARAMETER_MARKER); private final SimpleString address; private final QueueAttributes queueAttributes; @@ -81,22 +67,17 @@ public class ParameterisedAddress { } public ParameterisedAddress(String address) { - int index = address.indexOf(PARAMETER_MARKER); + int index = address.indexOf('?'); if (index == -1) { this.address = SimpleString.toSimpleString(address); this.queueAttributes = null; } else { this.address = SimpleString.toSimpleString(address.substring(0, index)); - String parametersString = address.substring(index + 1, address.length()); -
[3/5] activemq-artemis git commit: ARTEMIS-853 Support for exclusive consumers
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47c9a90d/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java -- diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java index cf2ec59..a0ccfcc 100644 --- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java +++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java @@ -52,6 +52,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; +import org.apache.activemq.artemis.api.core.QueueAttributes; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -318,7 +319,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (jbd.isQueue() && response.isAutoCreateQueues()) { // perhaps just relying on the broker to do it is simplest (i.e. purgeOnNoConsumers) session.createAddress(jbd.getSimpleAddress(), RoutingType.ANYCAST, true); - session.createQueue(jbd.getSimpleAddress(), RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers()); + createQueue(jbd, RoutingType.ANYCAST, jbd.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } else if (!jbd.isQueue() && response.isAutoCreateAddresses()) { session.createAddress(jbd.getSimpleAddress(), RoutingType.MULTICAST, true); } else { @@ -629,16 +630,16 @@ public class ActiveMQSession implements QueueSession, TopicSession { queueName = ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName); - if (durability == ConsumerDurability.DURABLE) { -try { - session.createSharedQueue(dest.getSimpleAddress(), RoutingType.MULTICAST, queueName, coreFilterString, true); -} catch (ActiveMQQueueExistsException ignored) { - // We ignore this because querying and then creating the queue wouldn't be idempotent - // we could also add a parameter to ignore existence what would require a bigger work around to avoid - // compatibility. + try { +if (durability == ConsumerDurability.DURABLE) { + createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); +} else { + createSharedQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, false, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } - } else { -session.createSharedQueue(dest.getSimpleAddress(), queueName, coreFilterString, false); + } catch (ActiveMQQueueExistsException ignored) { +// We ignore this because querying and then creating the queue wouldn't be idempotent +// we could also add a parameter to ignore existence what would require a bigger work around to avoid +// compatibility. } consumer = session.createConsumer(queueName, null, false); @@ -699,7 +700,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { if (!response.isExists() || !response.getQueueNames().contains(dest.getSimpleAddress())) { if (response.isAutoCreateQueues()) { try { - session.createQueue(dest.getSimpleAddress(), RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers()); + createQueue(dest, RoutingType.ANYCAST, dest.getSimpleAddress(), null, true, true, response.getDefaultMaxConsumers(), response.isDefaultPurgeOnNoConsumers(), response.isDefaultExclusive(), response.isDefaultLastValueQueue()); } catch (ActiveMQQueueExistsException e) { // The queue was created by another client/admin between the query check and send create queue packet }