Repository: activemq-artemis Updated Branches: refs/heads/master 4f1e74b7f -> c811ccbeb
ARTEMIS-2065 Can't change queue routing-type between restarts Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3827c54c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3827c54c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3827c54c Branch: refs/heads/master Commit: 3827c54c058e7fef062d6d5e7e97e3f6466361fe Parents: 4f1e74b Author: Justin Bertram <jbert...@apache.org> Authored: Thu Aug 30 13:21:07 2018 -0500 Committer: Justin Bertram <jbert...@apache.org> Committed: Thu Aug 30 14:31:18 2018 -0500 ---------------------------------------------------------------------- .../core/server/ActiveMQServerLogger.java | 7 +- .../core/server/impl/ActiveMQServerImpl.java | 25 +++--- .../tests/integration/jms/RedeployTest.java | 44 +++++++++- .../persistence/ConfigChangeTest.java | 86 ++++++++++++++++++++ .../reload-queue-routingtype-updated.xml | 40 +++++++++ .../test/resources/reload-queue-routingtype.xml | 40 +++++++++ 6 files changed, 225 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3827c54c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index e6b7b48..2cc00d3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -44,6 +44,7 @@ import io.netty.channel.Channel; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.Configuration; @@ -1898,12 +1899,12 @@ public interface ActiveMQServerLogger extends BasicLogger { void criticalSystemLog(Object component); @LogMessage(level = Logger.Level.INFO) - @Message(id = 224076, value = "UnDeploying address {0}", format = Message.Format.MESSAGE_FORMAT) + @Message(id = 224076, value = "Undeploying address {0}", format = Message.Format.MESSAGE_FORMAT) void undeployAddress(SimpleString addressName); @LogMessage(level = Logger.Level.INFO) - @Message(id = 224077, value = "UnDeploying queue {0}", format = Message.Format.MESSAGE_FORMAT) - void undeployQueue(SimpleString queueName); + @Message(id = 224077, value = "Undeploying {0} queue {1}", format = Message.Format.MESSAGE_FORMAT) + void undeployQueue(RoutingType routingType, SimpleString queueName); @LogMessage(level = Logger.Level.WARN) @Message(id = 224078, value = "The size of duplicate cache detection (<id_cache-size/>) appears to be too large {0}. It should be no greater than the number of messages that can be squeezed into conformation buffer (<confirmation-window-size/>) {1}.", format = Message.Format.MESSAGE_FORMAT) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3827c54c/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 6cb0515..715f984 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -2601,6 +2601,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { }, 0, dumpInfoInterval, TimeUnit.MILLISECONDS); } + // Undeploy any addresses and queues not in config + undeployAddressesAndQueueNotInConfiguration(); + // Deploy the rest of the stuff // Deploy predefined addresses @@ -2609,9 +2612,6 @@ public class ActiveMQServerImpl implements ActiveMQServer { // Deploy any predefined queues deployQueuesFromConfiguration(); - // Undeploy any addresses and queues not in config - undeployAddressesAndQueueNotInConfiguration(); - // We need to call this here, this gives any dependent server a chance to deploy its own addresses // this needs to be done before clustering is fully activated callActivateCallbacks(); @@ -2698,25 +2698,28 @@ public class ActiveMQServerImpl implements ActiveMQServer { .map(CoreAddressConfiguration::getName) .collect(Collectors.toSet()); - Set<String> queuesInConfig = configuration.getAddressConfigurations().stream() - .map(CoreAddressConfiguration::getQueueConfigurations) - .flatMap(List::stream).map(CoreQueueConfiguration::getName) - .collect(Collectors.toSet()); + Set<String> queuesInConfig = new HashSet<>(); + for (CoreAddressConfiguration cac : configuration.getAddressConfigurations()) { + for (CoreQueueConfiguration cqc : cac.getQueueConfigurations()) { + // combine the routing-type and queue name as the unique identifier as it's possible to change the routing-type without changing the name + queuesInConfig.add(cqc.getRoutingType().toString() + cqc.getName()); + } + } for (SimpleString addressName : listAddressNames()) { AddressSettings addressSettings = getAddressSettingsRepository().getMatch(addressName.toString()); if (!addressesInConfig.contains(addressName.toString()) && addressSettings.getConfigDeleteAddresses() == DeletionPolicy.FORCE) { for (Queue queue : listQueues(addressName)) { - ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); + ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName()); queue.deleteQueue(true); } ActiveMQServerLogger.LOGGER.undeployAddress(addressName); removeAddressInfo(addressName, null); } else if (addressSettings.getConfigDeleteQueues() == DeletionPolicy.FORCE) { for (Queue queue : listConfiguredQueues(addressName)) { - if (!queuesInConfig.contains(queue.getName().toString())) { - ActiveMQServerLogger.LOGGER.undeployQueue(queue.getName()); + if (!queuesInConfig.contains(queue.getRoutingType().toString() + queue.getName().toString())) { + ActiveMQServerLogger.LOGGER.undeployQueue(queue.getRoutingType(), queue.getName()); queue.deleteQueue(true); } } @@ -3441,8 +3444,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { } ActiveMQServerLogger.LOGGER.reloadingConfiguration("addresses"); - deployAddressesFromConfiguration(config); undeployAddressesAndQueueNotInConfiguration(config); + deployAddressesFromConfiguration(config); configuration.setAddressConfigurations(config.getAddressConfigurations()); configuration.setQueueConfigurations(config.getQueueConfigurations()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3827c54c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index e685568..1d8e7b9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.security.Role; @@ -196,9 +197,6 @@ public class RedeployTest extends ActiveMQTestBase { } - - - @Test public void testRedeployAddressQueue() throws Exception { Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); @@ -268,6 +266,46 @@ public class RedeployTest extends ActiveMQTestBase { } } + @Test + public void testRedeployChangeQueueRoutingType() throws Exception { + Path brokerXML = getTestDirfile().toPath().resolve("broker.xml"); + URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype.xml"); + URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype-updated.xml"); + Files.copy(url1.openStream(), brokerXML); + + EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ(); + embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString()); + embeddedActiveMQ.start(); + + final ReusableLatch latch = new ReusableLatch(1); + + Runnable tick = new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }; + + embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); + + try { + latch.await(10, TimeUnit.SECONDS); + Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress")); + Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType()); + + Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); + brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000); + latch.setCount(1); + embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); + latch.await(10, TimeUnit.SECONDS); + + Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress")); + Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType()); + } finally { + embeddedActiveMQ.stop(); + } + } + /** http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3827c54c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java new file mode 100644 index 0000000..3a2264f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.persistence; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.junit.Test; + +public class ConfigChangeTest extends ActiveMQTestBase { + + private ActiveMQServer server; + + @Test + public void testChangeQueueRoutingTypeOnRestart() throws Exception { + internalTestChangeQueueRoutingTypeOnRestart(false); + } + + @Test + public void testChangeQueueRoutingTypeOnRestartNegative() throws Exception { + internalTestChangeQueueRoutingTypeOnRestart(true); + } + + public void internalTestChangeQueueRoutingTypeOnRestart(boolean negative) throws Exception { + // if negative == true then the queue's routing type should *not* change + + Configuration configuration = createDefaultInVMConfig(); + configuration.addAddressesSetting("#", new AddressSettings() + .setConfigDeleteQueues(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE) + .setConfigDeleteAddresses(negative ? DeletionPolicy.OFF : DeletionPolicy.FORCE)); + + List addressConfigurations = new ArrayList(); + CoreAddressConfiguration addressConfiguration = new CoreAddressConfiguration() + .setName("myAddress") + .addRoutingType(RoutingType.ANYCAST) + .addQueueConfiguration(new CoreQueueConfiguration() + .setName("myQueue") + .setAddress("myAddress") + .setRoutingType(RoutingType.ANYCAST)); + addressConfigurations.add(addressConfiguration); + configuration.setAddressConfigurations(addressConfigurations); + server = createServer(true, configuration); + server.start(); + server.stop(); + + addressConfiguration = new CoreAddressConfiguration() + .setName("myAddress") + .addRoutingType(RoutingType.MULTICAST) + .addQueueConfiguration(new CoreQueueConfiguration() + .setName("myQueue") + .setAddress("myAddress") + .setRoutingType(RoutingType.MULTICAST)); + addressConfigurations.clear(); + addressConfigurations.add(addressConfiguration); + configuration.setAddressConfigurations(addressConfigurations); + + server.start(); + assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.getAddressInfo(SimpleString.toSimpleString("myAddress")).getRoutingType()); + assertEquals(negative ? RoutingType.ANYCAST : RoutingType.MULTICAST, server.locateQueue(SimpleString.toSimpleString("myQueue")).getRoutingType()); + server.stop(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3827c54c/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml new file mode 100644 index 0000000..e5bbe4f --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml @@ -0,0 +1,40 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + <address-settings> + <address-setting match="#"> + <config-delete-queues>FORCE</config-delete-queues> + </address-setting> + </address-settings> + + <addresses> + <address name="myAddress"> + <anycast> + <queue name="myQueue"/> + </anycast> + </address> + </addresses> + </core> +</configuration> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3827c54c/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml new file mode 100644 index 0000000..61ae86a --- /dev/null +++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml @@ -0,0 +1,40 @@ +<?xml version='1.0'?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<configuration xmlns="urn:activemq" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd"> + + <core xmlns="urn:activemq:core"> + <address-settings> + <address-setting match="#"> + <config-delete-queues>FORCE</config-delete-queues> + </address-setting> + </address-settings> + + <addresses> + <address name="myAddress"> + <multicast> + <queue name="myQueue"/> + </multicast> + </address> + </addresses> + </core> +</configuration>