ARTEMIS-743 Created QueueConfig that replace and enable additional behaviours on QueueFactory. Added Filter predicate.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c002cf13 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c002cf13 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c002cf13 Branch: refs/heads/master Commit: c002cf13b84308549db99c07918bf1075a5b75be Parents: e790c78 Author: Francesco Nigro <fni...@redhat.com> Authored: Tue Sep 20 22:39:28 2016 +0200 Committer: Martyn Taylor <mtay...@redhat.com> Committed: Thu Sep 22 15:45:14 2016 +0100 ---------------------------------------------------------------------- .../core/management/ActiveMQServerControl.java | 14 +- .../activemq/artemis/core/filter/Filter.java | 10 + .../artemis/core/filter/FilterUtils.java | 36 +++ .../impl/ActiveMQServerControlImpl.java | 10 +- .../artemis/core/server/ActiveMQServer.java | 16 +- .../artemis/core/server/QueueConfig.java | 270 +++++++++++++++++++ .../artemis/core/server/QueueFactory.java | 8 +- .../core/server/impl/ActiveMQServerImpl.java | 124 +++++---- .../server/impl/PostOfficeJournalLoader.java | 48 ++-- .../core/server/impl/QueueFactoryImpl.java | 15 ++ .../resources/schema/artemis-configuration.xsd | 116 ++++---- .../artemis/core/server/QueueConfigTest.java | 46 ++++ .../integration/client/HangConsumerTest.java | 18 +- .../client/InterruptedLargeMessageTest.java | 23 +- .../core/server/impl/QueueConcurrentTest.java | 4 +- .../server/impl/fakes/FakeQueueFactory.java | 9 +- 16 files changed, 597 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index ab78ef9..bb55d19 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -408,6 +408,8 @@ public interface ActiveMQServerControl { /** * Create a durable queue. * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * * @param address address to bind the queue to @@ -420,6 +422,8 @@ public interface ActiveMQServerControl { /** * Create a queue. * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * * @param address address to bind the queue to @@ -436,6 +440,8 @@ public interface ActiveMQServerControl { /** * Create a queue. * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. * * @param address address to bind the queue to @@ -450,6 +456,8 @@ public interface ActiveMQServerControl { /** * Deploy a durable queue. * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> * This method will do nothing if the queue with the given name already exists on the server. * * @param address address to bind the queue to @@ -464,6 +472,8 @@ public interface ActiveMQServerControl { /** * Deploy a queue. * <br> + * If {@code address} is {@code null} it will be defaulted to {@code name}. + * <br> * This method will do nothing if the queue with the given name already exists on the server. * * @param address address to bind the queue to @@ -645,7 +655,7 @@ public interface ActiveMQServerControl { /** * Lists all the consumers connected to this server. * The returned String is a JSON string containing details about each consumer, e.g.: - *<pre> + * <pre> * [ * { * "queueName": "fa87c64c-0a38-4697-8421-72e34d17429d", @@ -744,7 +754,7 @@ public interface ActiveMQServerControl { @Parameter(desc = "the policy to use when a slow consumer is detected", name = "slowConsumerPolicy") String slowConsumerPolicy, @Parameter(desc = "allow queues to be created automatically", name = "autoCreateJmsQueues") boolean autoCreateJmsQueues, @Parameter(desc = "allow auto-created queues to be deleted automatically", name = "autoDeleteJmsQueues") boolean autoDeleteJmsQueues, - @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics, + @Parameter(desc = "allow topics to be created automatically", name = "autoCreateJmsTopics") boolean autoCreateJmsTopics, @Parameter(desc = "allow auto-created topics to be deleted automatically", name = "autoDeleteJmsTopics") boolean autoDeleteJmsTopics) throws Exception; void removeAddressSettings(String addressMatch) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java index 5dd507c..41d5e54 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/Filter.java @@ -21,6 +21,16 @@ import org.apache.activemq.artemis.core.server.ServerMessage; public interface Filter { + /** + * JMS Topics (which are outside of the scope of the core API) will require a dumb subscription + * with a dummy-filter at this current version as a way to keep its existence valid and TCK + * tests. That subscription needs an invalid filter, however paging needs to ignore any + * subscription with this filter. For that reason, this filter needs to be rejected on paging or + * any other component on the system, and just be ignored for any purpose It's declared here as + * this filter is considered a global ignore + */ + String GENERIC_IGNORED_FILTER = "__AMQX=-1"; + boolean match(ServerMessage message); SimpleString getFilterString(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java new file mode 100644 index 0000000..c5b1a0a --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/FilterUtils.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.filter; + +public final class FilterUtils { + + private FilterUtils() { + + } + + /** + * Returns {@code true} if {@code filter} is a {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER}. + * + * @param filter a subscription filter + * @return {@code true} if {@code filter} is not {@code null} and is a {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER} + */ + public static boolean isTopicIdentification(final Filter filter) { + return filter != null && filter.getFilterString() != null && filter.getFilterString().toString().equals(Filter.GENERIC_IGNORED_FILTER); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 6aabbe3..362b74a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -551,7 +551,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - server.deployQueue(new SimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false); + server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false); } finally { blockOnIO(); @@ -569,7 +569,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - server.deployQueue(new SimpleString(address), new SimpleString(name), filter, durable, false); + server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false); } finally { blockOnIO(); @@ -582,7 +582,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false); + server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, true, false); } finally { blockOnIO(); @@ -595,7 +595,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active clearIO(); try { - server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false); + server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), null, durable, false); } finally { blockOnIO(); @@ -616,7 +616,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active filter = new SimpleString(filterStr); } - server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false); + server.createQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false); } finally { blockOnIO(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index ac65335..588c17c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -95,10 +95,14 @@ public interface ActiveMQServer extends ActiveMQComponent { NodeManager getNodeManager(); - /** it will release hold a lock for the activation. */ + /** + * it will release hold a lock for the activation. + */ void unlockActivation(); - /** it will hold a lock for the activation. This will prevent the activation from happening. */ + /** + * it will hold a lock for the activation. This will prevent the activation from happening. + */ void lockActivation(); /** @@ -266,15 +270,17 @@ public interface ActiveMQServer extends ActiveMQComponent { boolean waitForActivation(long timeout, TimeUnit unit) throws InterruptedException; /** - * Creates a shared queue. if non durable it will exist as long as there are consumers. - * + * Creates a transient queue. A queue that will exist as long as there are consumers. + * The queue will be deleted as soon as all the consumers are removed. + * <p> * Notice: the queue won't be deleted until the first consumer arrives. * * @param address * @param name * @param filterString * @param durable - * @throws Exception + * @throws ActiveMQInvalidTransientQueueUseException if the shared queue already exists with a different {@code address} or {@code filter} + * @throws NullPointerException if {@code address} is {@code null} */ void createSharedQueue(final SimpleString address, final SimpleString name, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java new file mode 100644 index 0000000..64df0da --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.filter.FilterUtils; +import org.apache.activemq.artemis.core.paging.PagingManager; +import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; + +public final class QueueConfig { + + private final long id; + private final SimpleString address; + private final SimpleString name; + private final Filter filter; + private final PageSubscription pageSubscription; + private final SimpleString user; + private final boolean durable; + private final boolean temporary; + private final boolean autoCreated; + + public static final class Builder { + + private final long id; + private final SimpleString address; + private final SimpleString name; + private Filter filter; + private PagingManager pagingManager; + private SimpleString user; + private boolean durable; + private boolean temporary; + private boolean autoCreated; + + private Builder(final long id, final SimpleString name) { + this(id, name, name); + } + + private Builder(final long id, final SimpleString name, final SimpleString address) { + this.id = id; + this.name = name; + this.address = address; + this.filter = null; + this.pagingManager = null; + this.user = null; + this.durable = true; + this.temporary = false; + this.autoCreated = true; + validateState(); + } + + private static boolean isEmptyOrNull(SimpleString value) { + return (value == null || value.length() == 0); + } + + private void validateState() { + if (isEmptyOrNull(this.name)) { + throw new IllegalStateException("name can't be null!"); + } + if (isEmptyOrNull(this.address)) { + throw new IllegalStateException("address can't be null!"); + } + } + + public Builder filter(final Filter filter) { + this.filter = filter; + return this; + } + + + public Builder pagingManager(final PagingManager pagingManager) { + this.pagingManager = pagingManager; + return this; + } + + public Builder user(final SimpleString user) { + this.user = user; + return this; + } + + public Builder durable(final boolean durable) { + this.durable = durable; + return this; + } + + public Builder temporary(final boolean temporary) { + this.temporary = temporary; + return this; + } + + public Builder autoCreated(final boolean autoCreated) { + this.autoCreated = autoCreated; + return this; + } + + + /** + * Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}. + * <br> + * The reference parameters aren't defensively copied from the {@link Builder} to the {@link QueueConfig}. + * <br> + * This method creates a new {@link PageSubscription} only if {@link #pagingManager} is not {@code null} and + * if {@link FilterUtils#isTopicIdentification} returns {@code false} on {@link #filter}. + * + * @throws IllegalStateException if the creation of {@link PageSubscription} fails + */ + public QueueConfig build() { + final PageSubscription pageSubscription; + if (pagingManager != null && !FilterUtils.isTopicIdentification(filter)) { + try { + pageSubscription = this.pagingManager.getPageStore(address).getCursorProvider().createSubscription(id, filter, durable); + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + else { + pageSubscription = null; + } + return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated); + } + + } + + /** + * Returns a new {@link Builder} of a durable, not temporary and autoCreated {@link QueueConfig} with the given {@code id} and {@code name}. + * <br> + * The {@code address} is defaulted to the {@code name} value. + * The reference parameters aren't defensively copied. + * + * @param id the id of the queue to be created + * @param name the name of the queue to be created + * @throws IllegalStateException if {@code name} is {@code null} or empty + */ + public static Builder builderWith(final long id, final SimpleString name) { + return new QueueConfig.Builder(id, name); + } + + /** + * Returns a new {@link Builder} of a durable, not temporary and autoCreated {@link QueueConfig} with the given {@code id}, {@code name} and {@code address}. + * <br> + * The reference parameters aren't defensively copied. + * + * @param id the id of the queue to be created + * @param name the name of the queue to be created + * @param address the address of the queue to be created + * @throws IllegalStateException if {@code name} or {@code address} are {@code null} or empty + */ + public static Builder builderWith(final long id, final SimpleString name, final SimpleString address) { + return new QueueConfig.Builder(id, name, address); + } + + private QueueConfig(final long id, + final SimpleString address, + final SimpleString name, + final Filter filter, + final PageSubscription pageSubscription, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean autoCreated) { + this.id = id; + this.address = address; + this.name = name; + this.filter = filter; + this.pageSubscription = pageSubscription; + this.user = user; + this.durable = durable; + this.temporary = temporary; + this.autoCreated = autoCreated; + } + + public long id() { + return id; + } + + public SimpleString address() { + return address; + } + + public SimpleString name() { + return name; + } + + public Filter filter() { + return filter; + } + + public PageSubscription pageSubscription() { + return pageSubscription; + } + + public SimpleString user() { + return user; + } + + public boolean isDurable() { + return durable; + } + + public boolean isTemporary() { + return temporary; + } + + public boolean isAutoCreated() { + return autoCreated; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + QueueConfig that = (QueueConfig) o; + + if (id != that.id) + return false; + if (durable != that.durable) + return false; + if (temporary != that.temporary) + return false; + if (autoCreated != that.autoCreated) + return false; + if (address != null ? !address.equals(that.address) : that.address != null) + return false; + if (name != null ? !name.equals(that.name) : that.name != null) + return false; + if (filter != null ? !filter.equals(that.filter) : that.filter != null) + return false; + if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null) + return false; + return user != null ? user.equals(that.user) : that.user == null; + + } + + @Override + public int hashCode() { + int result = (int) (id ^ (id >>> 32)); + result = 31 * result + (address != null ? address.hashCode() : 0); + result = 31 * result + (name != null ? name.hashCode() : 0); + result = 31 * result + (filter != null ? filter.hashCode() : 0); + result = 31 * result + (pageSubscription != null ? pageSubscription.hashCode() : 0); + result = 31 * result + (user != null ? user.hashCode() : 0); + result = 31 * result + (durable ? 1 : 0); + result = 31 * result + (temporary ? 1 : 0); + result = 31 * result + (autoCreated ? 1 : 0); + return result; + } + + @Override + public String toString() { + return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}'; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java index 5e9f9f1..64e7a5d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueFactory.java @@ -23,12 +23,18 @@ import org.apache.activemq.artemis.core.postoffice.PostOffice; /** * A QueueFactory - * + * <p> * Implementations of this class know how to create queues with the correct attribute values * based on default and overrides */ public interface QueueFactory { + Queue createQueueWith(final QueueConfig config); + + /** + * @deprecated Replaced by {@link #createQueueWith} + */ + @Deprecated Queue createQueue(long persistenceID, final SimpleString address, SimpleString name, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 38005ed..9bf084d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -113,6 +113,7 @@ import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.PostQueueCreationCallback; import org.apache.activemq.artemis.core.server.PostQueueDeletionCallback; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueCreator; import org.apache.activemq.artemis.core.server.QueueDeleter; import org.apache.activemq.artemis.core.server.QueueFactory; @@ -172,8 +173,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { * subscription with this filter. For that reason, this filter needs to be rejected on paging or * any other component on the system, and just be ignored for any purpose It's declared here as * this filter is considered a global ignore + * + * @deprecated Replaced by {@link org.apache.activemq.artemis.core.filter.Filter#GENERIC_IGNORED_FILTER} */ - public static final String GENERIC_IGNORED_FILTER = "__AMQX=-1"; + @Deprecated + public static final String GENERIC_IGNORED_FILTER = Filter.GENERIC_IGNORED_FILTER; private HAPolicy haPolicy; @@ -184,22 +188,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { * {@link SERVER_STATE#STOPPED}, so that methods testing for these two values such as * {@link #stop(boolean)} worked as intended. */ - STARTING, - /** + STARTING, /** * server is started. {@code server.isStarted()} returns {@code true}, and all assumptions * about it hold. */ - STARTED, - /** + STARTED, /** * stop() was called but has not finished yet. Meant to avoids starting components while * stop() is executing. */ - STOPPING, - /** + STOPPING, /** * Stopped: either stop() has been called and has finished running, or start() has never been * called. */ - STOPPED; + STOPPED } private volatile SERVER_STATE state = SERVER_STATE.STOPPED; @@ -1290,10 +1291,11 @@ public class ActiveMQServerImpl implements ActiveMQServer { SessionCallback callback, OperationContext context, boolean autoCreateJMSQueues) throws Exception { - return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), - xa, connection, storageManager, postOffice, resourceManager, securityStore, managementService, this, configuration.getManagementAddress(), - defaultAddress == null ? null : new SimpleString(defaultAddress), callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, - pagingManager); + return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, + autoCommitAcks, preAcknowledge, configuration.isPersistDeliveryCountBeforeDelivery(), xa, + connection, storageManager, postOffice, resourceManager, securityStore, managementService, + this, configuration.getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), + callback, context, autoCreateJMSQueues ? jmsQueueCreator : null, pagingManager); } @Override @@ -1370,7 +1372,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Binding binding : postOffice.getAllBindings().values()) { if (binding.getType() == BindingType.LOCAL_QUEUE) { - total += ((LocalQueueBinding)binding).getQueue().getMessageCount(); + total += ((LocalQueueBinding) binding).getQueue().getMessageCount(); } } @@ -1383,7 +1385,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Binding binding : postOffice.getAllBindings().values()) { if (binding.getType() == BindingType.LOCAL_QUEUE) { - total += ((LocalQueueBinding)binding).getQueue().getMessagesAdded(); + total += ((LocalQueueBinding) binding).getQueue().getMessagesAdded(); } } @@ -1396,7 +1398,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Binding binding : postOffice.getAllBindings().values()) { if (binding.getType() == BindingType.LOCAL_QUEUE) { - total += ((LocalQueueBinding)binding).getQueue().getMessagesAcknowledged(); + total += ((LocalQueueBinding) binding).getQueue().getMessagesAcknowledged(); } } @@ -1409,7 +1411,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { for (Binding binding : postOffice.getAllBindings().values()) { if (binding.getType() == BindingType.LOCAL_QUEUE) { - total += ((LocalQueueBinding)binding).getQueue().getConsumerCount(); + total += ((LocalQueueBinding) binding).getQueue().getConsumerCount(); } } @@ -1461,25 +1463,17 @@ public class ActiveMQServerImpl implements ActiveMQServer { return createQueue(address, queueName, filterString, user, durable, temporary, false, false, autoCreated); } - /** - * Creates a transient queue. A queue that will exist as long as there are consumers. - * The queue will be deleted as soon as all the consumers are removed. - * <p> - * Notice: the queue won't be deleted until the first consumer arrives. - * - * @param address - * @param name - * @param filterString - * @param durable - * @throws Exception - */ @Override public void createSharedQueue(final SimpleString address, final SimpleString name, final SimpleString filterString, final SimpleString user, boolean durable) throws Exception { - Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false); + //force the old contract about address + if (address == null) { + throw new NullPointerException("address can't be null!"); + } + final Queue queue = createQueue(address, name, filterString, user, durable, !durable, true, !durable, false); if (!queue.getAddress().equals(address)) { throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name); @@ -1490,8 +1484,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } if (logger.isDebugEnabled()) { - logger.debug("Transient Queue " + name + " created on address " + name + - " with filter=" + filterString); + logger.debug("Transient Queue " + name + " created on address " + name + " with filter=" + filterString); } } @@ -1653,7 +1646,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override - public void callPostQueueDeletionCallbacks(final SimpleString address, final SimpleString queueName) throws Exception { + public void callPostQueueDeletionCallbacks(final SimpleString address, + final SimpleString queueName) throws Exception { for (PostQueueDeletionCallback callback : postQueueDeletionCallbacks) { callback.callback(address, queueName); } @@ -1933,8 +1927,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { storageManager = createStorageManager(); - if (configuration.getClusterConfigurations().size() > 0 && - ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) { + if (configuration.getClusterConfigurations().size() > 0 && ActiveMQDefaultConfiguration.getDefaultClusterUser().equals(configuration.getClusterUser()) && ActiveMQDefaultConfiguration.getDefaultClusterPassword().equals(configuration.getClusterPassword())) { ActiveMQServerLogger.LOGGER.clusterSecurityRisk(); } @@ -1984,7 +1977,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { reloadManager.addCallback(configuration.getConfigurationUrl(), new ConfigurationFileReloader()); } - return true; } @@ -2066,7 +2058,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - /** This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. */ + /** + * This method exists for a possibility of test cases replacing the FileStoreMonitor for an extension that would for instance pretend a disk full on certain tests. + */ public void injectMonitor(FileStoreMonitor storeMonitor) throws Exception { this.fileStoreMonitor = storeMonitor; pagingManager.injectMonitor(storeMonitor); @@ -2109,7 +2103,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { addressCount++; } - long maxMemory = Runtime.getRuntime().maxMemory(); if (totalMaxSizeBytes >= maxMemory && configuration.getGlobalMaxSize() < 0) { ActiveMQServerLogger.LOGGER.potentialOOME(addressCount, totalMaxSizeBytes, maxMemory); @@ -2201,8 +2194,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean ignoreIfExists, final boolean transientQueue, final boolean autoCreated) throws Exception { - QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); - + final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); if (binding != null) { if (ignoreIfExists) { return binding.getQueue(); @@ -2212,38 +2204,37 @@ public class ActiveMQServerImpl implements ActiveMQServer { } } - Filter filter = FilterImpl.createFilter(filterString); - - long txID = storageManager.generateID(); - long queueID = storageManager.generateID(); + final Filter filter = FilterImpl.createFilter(filterString); - PageSubscription pageSubscription; + final long txID = storageManager.generateID(); + final long queueID = storageManager.generateID(); - if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER)) { - pageSubscription = null; + final QueueConfig.Builder queueConfigBuilder; + if (address == null) { + queueConfigBuilder = QueueConfig.builderWith(queueID, queueName); } else { - pageSubscription = pagingManager.getPageStore(address).getCursorProvider().createSubscription(queueID, filter, durable); - } - - final Queue queue = queueFactory.createQueue(queueID, address, queueName, filter, pageSubscription, user, durable, temporary, autoCreated); + queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, address); + } + final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build(); + final Queue queue = queueFactory.createQueueWith(queueConfig); if (transientQueue) { - queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queueName)); + queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName())); } - else if (autoCreated) { - queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queueName)); + else if (queue.isAutoCreated()) { + queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(this.getJMSQueueDeleter(), queue.getName())); } - binding = new LocalQueueBinding(address, queue, nodeManager.getNodeId()); + final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); - if (durable) { - storageManager.addQueueBinding(txID, binding); + if (queue.isDurable()) { + storageManager.addQueueBinding(txID, localQueueBinding); } try { - postOffice.addBinding(binding); - if (durable) { + postOffice.addBinding(localQueueBinding); + if (queue.isDurable()) { storageManager.commitBindings(txID); } } @@ -2252,11 +2243,14 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (durable) { storageManager.rollbackBindings(txID); } - if (queue != null) { + final PageSubscription pageSubscription = queue.getPageSubscription(); + try { queue.close(); } - if (pageSubscription != null) { - pageSubscription.destroy(); + finally { + if (pageSubscription != null) { + pageSubscription.destroy(); + } } } catch (Throwable ignored) { @@ -2265,10 +2259,10 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw e; } - managementService.registerAddress(address); - managementService.registerQueue(queue, address, storageManager); + managementService.registerAddress(queue.getAddress()); + managementService.registerQueue(queue, queue.getAddress(), storageManager); - callPostQueueCreationCallbacks(queueName); + callPostQueueCreationCallbacks(queue.getName()); return queue; } @@ -2423,6 +2417,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } private final class ActivationThread extends Thread { + final Runnable runnable; ActivationThread(Runnable runnable, String name) { @@ -2444,6 +2439,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { } private final class ConfigurationFileReloader implements ReloadCallback { + @Override public void reload(URL uri) throws Exception { Configuration config = new FileConfigurationParser().parseMainConfig(uri.openStream()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index 0645dca..ff93ffe 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -29,12 +29,12 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.filter.Filter; +import org.apache.activemq.artemis.core.filter.FilterUtils; import org.apache.activemq.artemis.core.filter.impl.FilterImpl; import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.paging.PagedMessage; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingStore; -import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.paging.cursor.PageSubscriptionCounter; import org.apache.activemq.artemis.core.paging.impl.Page; import org.apache.activemq.artemis.core.persistence.GroupingInfo; @@ -51,6 +51,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.group.GroupingHandler; @@ -67,12 +68,12 @@ public class PostOfficeJournalLoader implements JournalLoader { protected final PostOffice postOffice; protected final PagingManager pagingManager; - private StorageManager storageManager; + private final StorageManager storageManager; private final QueueFactory queueFactory; protected final NodeManager nodeManager; private final ManagementService managementService; private final GroupingHandler groupingHandler; - private Configuration configuration; + private final Configuration configuration; private Map<Long, Queue> queues; public PostOfficeJournalLoader(PostOffice postOffice, @@ -113,50 +114,45 @@ public class PostOfficeJournalLoader implements JournalLoader { public void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap, List<QueueBindingInfo> queueBindingInfos) throws Exception { int duplicateID = 0; - for (QueueBindingInfo queueBindingInfo : queueBindingInfos) { + for (final QueueBindingInfo queueBindingInfo : queueBindingInfos) { queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo); - Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString()); + final Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString()); - boolean isTopicIdentification = filter != null && filter.getFilterString() != null && - filter.getFilterString().toString().equals(ActiveMQServerImpl.GENERIC_IGNORED_FILTER); + final boolean isTopicIdentification = FilterUtils.isTopicIdentification(filter); if (postOffice.getBinding(queueBindingInfo.getQueueName()) != null) { if (isTopicIdentification) { - long tx = storageManager.generateID(); + final long tx = storageManager.generateID(); storageManager.deleteQueueBinding(tx, queueBindingInfo.getId()); storageManager.commitBindings(tx); continue; } else { - - SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++)); + final SimpleString newName = queueBindingInfo.getQueueName().concat("-" + (duplicateID++)); ActiveMQServerLogger.LOGGER.queueDuplicatedRenaming(queueBindingInfo.getQueueName().toString(), newName.toString()); queueBindingInfo.replaceQueueName(newName); } } - - PageSubscription subscription = null; - - if (!isTopicIdentification) { - subscription = pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvider().createSubscription(queueBindingInfo.getId(), filter, true); + final QueueConfig.Builder queueConfigBuilder; + if (queueBindingInfo.getAddress() == null) { + queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName()); } - - Queue queue = queueFactory.createQueue(queueBindingInfo.getId(), queueBindingInfo.getAddress(), queueBindingInfo.getQueueName(), filter, subscription, queueBindingInfo.getUser(), true, false, queueBindingInfo.isAutoCreated()); - - if (queueBindingInfo.isAutoCreated()) { + else { + queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress()); + } + queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated()); + final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); + if (queue.isAutoCreated()) { queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName())); } - Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue, nodeManager.getNodeId()); - - queues.put(queueBindingInfo.getId(), queue); - + final Binding binding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId()); + queues.put(queue.getID(), queue); postOffice.addBinding(binding); - - managementService.registerAddress(queueBindingInfo.getAddress()); - managementService.registerQueue(queue, queueBindingInfo.getAddress(), storageManager); + managementService.registerAddress(queue.getAddress()); + managementService.registerQueue(queue, queue.getAddress(), storageManager); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 280bb13..d8f772d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -66,6 +67,20 @@ public class QueueFactoryImpl implements QueueFactory { } @Override + public Queue createQueueWith(final QueueConfig config) { + final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString()); + final Queue queue; + if (addressSettings.isLastValueQueue()) { + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + } + else { + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + } + return queue; + } + + @Deprecated + @Override public Queue createQueue(final long persistenceID, final SimpleString address, final SimpleString name, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 4d4abd7..73aa20b 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -38,7 +38,8 @@ minOccurs="0"> <xsd:annotation> <xsd:documentation> - If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the + If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available + on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers. </xsd:documentation> @@ -154,7 +155,8 @@ </xsd:element> <xsd:element name="password-codec" type="xsd:string" - default="org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec" maxOccurs="1" minOccurs="0"> + default="org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec" maxOccurs="1" + minOccurs="0"> <xsd:annotation> <xsd:documentation> Class name and its parameters for the Decoder used to decode the masked password. Ignored if @@ -199,9 +201,9 @@ <xsd:element name="jmx-use-broker-name" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0"> <xsd:annotation> - <xsd:documentation> - Whether or not to use the broker name in the JMX properties - </xsd:documentation> + <xsd:documentation> + Whether or not to use the broker name in the JMX properties + </xsd:documentation> </xsd:annotation> </xsd:element> @@ -246,7 +248,8 @@ </xsd:annotation> </xsd:element> - <xsd:element name="configuration-file-refresh-period" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0"> + <xsd:element name="configuration-file-refresh-period" type="xsd:long" default="5000" maxOccurs="1" + minOccurs="0"> <xsd:annotation> <xsd:documentation> how often (in ms) to check the configuration file for modifications @@ -432,7 +435,7 @@ <xsd:element name="queue" maxOccurs="unbounded" minOccurs="0"> <xsd:complexType> <xsd:all> - <xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="1"> + <xsd:element name="address" type="xsd:string" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> address for the queue @@ -679,7 +682,8 @@ <xsd:element name="global-max-size" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> - Global Max Size before all addresses will enter into their Full Policy configured upon messages being produced. + Global Max Size before all addresses will enter into their Full Policy configured upon messages being + produced. </xsd:documentation> </xsd:annotation> </xsd:element> @@ -1233,11 +1237,11 @@ <xsd:complexType name="clusterConnectionChoiceType"> <xsd:sequence> - <xsd:choice maxOccurs="unbounded"> - <xsd:element name="cluster-connection-uri" type="cluster-connectionUriType"/> - <xsd:element name="cluster-connection" type="cluster-connectionType"> - </xsd:element> - </xsd:choice> + <xsd:choice maxOccurs="unbounded"> + <xsd:element name="cluster-connection-uri" type="cluster-connectionUriType"/> + <xsd:element name="cluster-connection" type="cluster-connectionType"> + </xsd:element> + </xsd:choice> </xsd:sequence> </xsd:complexType> @@ -1350,7 +1354,7 @@ </xsd:element> <xsd:element name="use-duplicate-detection" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0"> - <xsd:annotation > + <xsd:annotation> <xsd:documentation> should duplicate detection headers be inserted in forwarded messages? </xsd:documentation> @@ -1360,7 +1364,8 @@ <xsd:element name="forward-when-no-consumers" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> - DEPRECATED: use message-load-balancing-type instead. Select STRICT to mimic forward-when-no-consumers=true + DEPRECATED: use message-load-balancing-type instead. Select STRICT to mimic + forward-when-no-consumers=true and ON_DEMAND to mimic forward-when-no-consumers=false. </xsd:documentation> </xsd:annotation> @@ -1858,7 +1863,8 @@ </xsd:documentation> </xsd:annotation> </xsd:element> - <xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0"> + <xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" + minOccurs="0"> <xsd:annotation> <xsd:documentation> The amount of time to wait for the replica to acknowledge it has received all the necessary data from @@ -1923,11 +1929,13 @@ <xsd:element name="failback-delay" type="xsd:long" default="5000" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> - DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back occurs + DEPRECATED: if we have to start as a replicated server this is the delay to wait before fail-back + occurs </xsd:documentation> </xsd:annotation> </xsd:element> - <xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" minOccurs="0"> + <xsd:element name="initial-replication-sync-timeout" type="xsd:long" default="30000" maxOccurs="1" + minOccurs="0"> <xsd:annotation> <xsd:documentation> If we have to start as a replicated server this is the amount of time to wait for the replica to @@ -2099,31 +2107,33 @@ </xsd:annotation> </xsd:element> <xsd:choice> - <xsd:element name="discovery-group-ref" maxOccurs="1" minOccurs="0"> - <xsd:complexType> - <xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required"> - <xsd:annotation> - <xsd:documentation> - The discovery group to use for scale down, if not supplied then the scale-down-connectors or first - invm connector will be used - </xsd:documentation> - </xsd:annotation> - </xsd:attribute> - </xsd:complexType> - </xsd:element> - <xsd:element name="connectors" minOccurs="0" maxOccurs="1"> - <xsd:annotation> - <xsd:documentation> - A list of connectors to use for scaling down, if not supplied then the scale-down-discovery-group or - first invm connector will be used - </xsd:documentation> - </xsd:annotation> - <xsd:complexType> - <xsd:sequence> - <xsd:element name="connector-ref" type="xsd:string" maxOccurs="unbounded" minOccurs="1"/> - </xsd:sequence> - </xsd:complexType> - </xsd:element> + <xsd:element name="discovery-group-ref" maxOccurs="1" minOccurs="0"> + <xsd:complexType> + <xsd:attribute name="discovery-group-name" type="xsd:IDREF" use="required"> + <xsd:annotation> + <xsd:documentation> + The discovery group to use for scale down, if not supplied then the scale-down-connectors or + first + invm connector will be used + </xsd:documentation> + </xsd:annotation> + </xsd:attribute> + </xsd:complexType> + </xsd:element> + <xsd:element name="connectors" minOccurs="0" maxOccurs="1"> + <xsd:annotation> + <xsd:documentation> + A list of connectors to use for scaling down, if not supplied then the scale-down-discovery-group + or + first invm connector will be used + </xsd:documentation> + </xsd:annotation> + <xsd:complexType> + <xsd:sequence> + <xsd:element name="connector-ref" type="xsd:string" maxOccurs="unbounded" minOccurs="1"/> + </xsd:sequence> + </xsd:complexType> + </xsd:element> </xsd:choice> </xsd:sequence> </xsd:complexType> @@ -2252,15 +2262,19 @@ <xsd:element name="max-size-bytes" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> <xsd:annotation> <xsd:documentation> - the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and FAIL policies. + the maximum size (in bytes) for an address (-1 means no limits). This is used in PAGING, BLOCK and + FAIL policies. </xsd:documentation> </xsd:annotation> </xsd:element> - <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0"> + <xsd:element name="max-size-bytes-reject-threshold" type="xsd:long" default="-1" maxOccurs="1" + minOccurs="0"> <xsd:annotation> <xsd:documentation> - used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. Default = -1 (no limit). + used with the address full BLOCK policy, the maximum size (in bytes) an address can reach before + messages start getting rejected. Works in combination with max-size-bytes for AMQP protocol only. + Default = -1 (no limit). </xsd:documentation> </xsd:annotation> </xsd:element> @@ -2491,11 +2505,11 @@ </xsd:complexType> <xsd:complexType name="transportType"> - <xsd:simpleContent> + <xsd:simpleContent> <xsd:extension base="xsd:string"> - <xsd:attribute name="name" type="xsd:string"> - </xsd:attribute> + <xsd:attribute name="name" type="xsd:string"> + </xsd:attribute> </xsd:extension> - </xsd:simpleContent> - </xsd:complexType> + </xsd:simpleContent> + </xsd:complexType> </xsd:schema> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java new file mode 100644 index 0000000..62e396d --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/QueueConfigTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.junit.Assert; +import org.junit.Test; + +public class QueueConfigTest { + + @Test + public void addressMustBeDefaultedToName() { + final QueueConfig queueConfig = QueueConfig.builderWith(1L, new SimpleString("queue_name")).build(); + Assert.assertEquals(queueConfig.name(), queueConfig.address()); + } + + @Test(expected = IllegalStateException.class) + public void cannotAllowNullAddress() { + QueueConfig.builderWith(1L, new SimpleString("queue_name"), null); + } + + @Test(expected = IllegalStateException.class) + public void cannotAllowNullNameWithoutAddress() { + QueueConfig.builderWith(1L, null); + } + + @Test(expected = IllegalStateException.class) + public void cannotAllowNullNameWithAddress() { + QueueConfig.builderWith(1L, null, new SimpleString("queue_address")); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java index 89432d9..4ddabec 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java @@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionRec import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -255,6 +256,13 @@ public class HangConsumerTest extends ActiveMQTestBase { } @Override + public Queue createQueueWith(final QueueConfig config) { + queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor()); + return queue; + } + + @Deprecated + @Override public Queue createQueue(final long persistenceID, final SimpleString address, final SimpleString name, @@ -535,7 +543,11 @@ public class HangConsumerTest extends ActiveMQTestBase { * @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int) */ @Override - public int sendLargeMessage(MessageReference reference, ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) { + public int sendLargeMessage(MessageReference reference, + ServerMessage message, + ServerConsumer consumer, + long bodySize, + int deliveryCount) { return targetCallback.sendLargeMessage(reference, message, consumer, bodySize, deliveryCount); } @@ -567,9 +579,7 @@ public class HangConsumerTest extends ActiveMQTestBase { class MyActiveMQServer extends ActiveMQServerImpl { - MyActiveMQServer(Configuration configuration, - MBeanServer mbeanServer, - ActiveMQSecurityManager securityManager) { + MyActiveMQServer(Configuration configuration, MBeanServer mbeanServer, ActiveMQSecurityManager securityManager) { super(configuration, mbeanServer, securityManager); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java index a4be3ad..f7b1c41 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InterruptedLargeMessageTest.java @@ -33,18 +33,14 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.StoreConfiguration; -import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; -import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; -import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -54,6 +50,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCon import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -61,7 +58,11 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; +import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ExecutorFactory; import org.junit.Assert; import org.junit.Before; @@ -208,15 +209,12 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { server.stop(); } - - @Test public void testForcedInterruptUsingJMS() throws Exception { ActiveMQServer server = createServer(true, isNetty()); server.start(); - SimpleString jmsAddress = new SimpleString("jms.queue.Test"); server.createQueue(jmsAddress, jmsAddress, null, true, false); @@ -265,7 +263,6 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { server.stop(); } - @Test public void testSendNonPersistentQueue() throws Exception { @@ -540,7 +537,7 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { } } - class NoPostACKQueueFactory implements QueueFactory { + final class NoPostACKQueueFactory implements QueueFactory { final StorageManager storageManager; @@ -565,6 +562,12 @@ public class InterruptedLargeMessageTest extends LargeMessageTestBase { } @Override + public Queue createQueueWith(final QueueConfig config) { + return new NoPostACKQueue(config.id(), config.address(), config.name(), config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, execFactory.getExecutor()); + } + + @Deprecated + @Override public Queue createQueue(long persistenceID, SimpleString address, SimpleString name, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java ---------------------------------------------------------------------- diff --git a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java index adf97fc..34433ff 100644 --- a/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java +++ b/tests/timing-tests/src/test/java/org/apache/activemq/artemis/tests/timing/core/server/impl/QueueConcurrentTest.java @@ -23,6 +23,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.HandleStatus; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.tests.unit.UnitTestLogger; @@ -63,7 +64,8 @@ public class QueueConcurrentTest extends ActiveMQTestBase { */ @Test public void testConcurrentAddsDeliver() throws Exception { - QueueImpl queue = (QueueImpl) queueFactory.createQueue(1, new SimpleString("address1"), new SimpleString("queue1"), null, null, null, false, false, false); + + QueueImpl queue = (QueueImpl) queueFactory.createQueueWith(QueueConfig.builderWith(1, new SimpleString("address1"), new SimpleString("queue1")).durable(false).temporary(false).autoCreated(false).build()); FakeConsumer consumer = new FakeConsumer(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c002cf13/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java ---------------------------------------------------------------------- diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java index b507d3e..06c7e1e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeQueueFactory.java @@ -25,11 +25,12 @@ import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.paging.cursor.PageSubscription; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; -public class FakeQueueFactory implements QueueFactory { +public final class FakeQueueFactory implements QueueFactory { private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(ActiveMQThreadFactory.defaultThreadFactory()); @@ -38,6 +39,12 @@ public class FakeQueueFactory implements QueueFactory { private PostOffice postOffice; @Override + public Queue createQueueWith(final QueueConfig config) { + return new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), scheduledExecutor, postOffice, null, null, executor); + } + + @Deprecated + @Override public Queue createQueue(final long persistenceID, final SimpleString address, final SimpleString name,