Add 'routing-type' to divert
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/77684850 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/77684850 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/77684850 Branch: refs/heads/ARTEMIS-780 Commit: 77684850b19cf0e5c167381d58bea7a9d04dfbff Parents: f575900 Author: jbertram <jbert...@apache.com> Authored: Thu Dec 1 22:30:37 2016 -0600 Committer: jbertram <jbert...@apache.com> Committed: Thu Dec 1 22:43:55 2016 -0600 ---------------------------------------------------------------------- .../config/ActiveMQDefaultConfiguration.java | 10 ++ .../core/management/ActiveMQServerControl.java | 10 ++ .../api/core/management/DivertControl.java | 6 + .../artemis/core/server/RoutingType.java | 10 +- .../core/config/DivertConfiguration.java | 21 +++ .../artemis/core/config/impl/Validators.java | 14 ++ .../deployers/impl/FileConfigurationParser.java | 4 +- .../impl/ActiveMQServerControlImpl.java | 14 +- .../core/management/impl/DivertControlImpl.java | 10 ++ .../core/server/ActiveMQMessageBundle.java | 3 + .../core/server/impl/ActiveMQServerImpl.java | 2 +- .../artemis/core/server/impl/DivertImpl.java | 23 +++- .../resources/schema/artemis-configuration.xsd | 16 +++ .../tests/integration/divert/DivertTest.java | 136 +++++++++++++++++++ .../ActiveMQServerControlUsingCoreTest.java | 12 ++ .../management/DivertControlUsingCoreTest.java | 5 + 16 files changed, 291 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index e75c663..b4518fe 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -360,6 +360,9 @@ public final class ActiveMQDefaultConfiguration { // whether this is an exclusive divert private static boolean DEFAULT_DIVERT_EXCLUSIVE = false; + // how the divert should handle the message's routing type + private static String DEFAULT_DIVERT_ROUTING_TYPE = RoutingType.STRIP.toString(); + // If true then the server will request a backup on another node private static boolean DEFAULT_HAPOLICY_REQUEST_BACKUP = false; @@ -1007,6 +1010,13 @@ public final class ActiveMQDefaultConfiguration { } /** + * how the divert should handle the message's routing type + */ + public static String getDefaultDivertRoutingType() { + return DEFAULT_DIVERT_ROUTING_TYPE; + } + + /** * If true then the server will request a backup on another node */ public static boolean isDefaultHapolicyRequestBackup() { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index cd257c6..1797c9a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -899,6 +899,16 @@ public interface ActiveMQServerControl { @Parameter(name = "filterString", desc = "Filter of the divert") String filterString, @Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName) throws Exception; + @Operation(desc = "Create a Divert", impact = MBeanOperationInfo.ACTION) + void createDivert(@Parameter(name = "name", desc = "Name of the divert") String name, + @Parameter(name = "routingName", desc = "Routing name of the divert") String routingName, + @Parameter(name = "address", desc = "Address to divert from") String address, + @Parameter(name = "forwardingAddress", desc = "Address to divert to") String forwardingAddress, + @Parameter(name = "exclusive", desc = "Is the divert exclusive?") boolean exclusive, + @Parameter(name = "filterString", desc = "Filter of the divert") String filterString, + @Parameter(name = "transformerClassName", desc = "Class name of the divert's transformer") String transformerClassName, + @Parameter(name = "routingType", desc = "How should the routing-type on the diverted messages be set?") String routingType) throws Exception; + @Operation(desc = "Destroy a Divert", impact = MBeanOperationInfo.ACTION) void destroyDivert(@Parameter(name = "name", desc = "Name of the divert") String name) throws Exception; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java index c99646b..7c103ca 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/DivertControl.java @@ -65,4 +65,10 @@ public interface DivertControl { */ @Attribute(desc = "name of the org.apache.activemq.artemis.core.server.cluster.Transformer implementation associated with this divert") String getTransformerClassName(); + + /** + * Returns the routing type used by this divert. + */ + @Attribute(desc = "routing type used by this divert") + String getRoutingType(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java index 2f17335..c9b1d09 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/RoutingType.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.server; public enum RoutingType { - MULTICAST, ANYCAST; + MULTICAST, ANYCAST, STRIP, PASS; public byte getType() { switch (this) { @@ -26,6 +26,10 @@ public enum RoutingType { return 0; case ANYCAST: return 1; + case STRIP: + return 2; + case PASS: + return 3; default: return -1; } @@ -37,6 +41,10 @@ public enum RoutingType { return MULTICAST; case 1: return ANYCAST; + case 2: + return STRIP; + case 3: + return PASS; default: return null; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java index a769f17..5326c72 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/DivertConfiguration.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.config; import java.io.Serializable; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.utils.UUIDGenerator; public class DivertConfiguration implements Serializable { @@ -39,6 +40,8 @@ public class DivertConfiguration implements Serializable { private String transformerClassName = null; + private RoutingType routingType = RoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType()); + public DivertConfiguration() { } @@ -70,6 +73,10 @@ public class DivertConfiguration implements Serializable { return transformerClassName; } + public RoutingType getRoutingType() { + return routingType; + } + /** * @param name the name to set */ @@ -130,6 +137,14 @@ public class DivertConfiguration implements Serializable { return this; } + /** + * @param routingType the routingType to set + */ + public DivertConfiguration setRoutingType(final RoutingType routingType) { + this.routingType = routingType; + return this; + } + @Override public int hashCode() { final int prime = 31; @@ -141,6 +156,7 @@ public class DivertConfiguration implements Serializable { result = prime * result + ((name == null) ? 0 : name.hashCode()); result = prime * result + ((routingName == null) ? 0 : routingName.hashCode()); result = prime * result + ((transformerClassName == null) ? 0 : transformerClassName.hashCode()); + result = prime * result + ((routingType == null) ? 0 : routingType.hashCode()); return result; } @@ -185,6 +201,11 @@ public class DivertConfiguration implements Serializable { return false; } else if (!transformerClassName.equals(other.transformerClassName)) return false; + if (routingType == null) { + if (other.routingType != null) + return false; + } else if (!routingType.equals(other.routingType)) + return false; return true; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java index bc57978..3e9bb4c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.config.impl; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; @@ -164,6 +165,19 @@ public final class Validators { } }; + public static final Validator ROUTING_TYPE = new Validator() { + @Override + public void validate(final String name, final Object value) { + String val = (String) value; + if (val == null || !val.equals(RoutingType.ANYCAST.toString()) && + !val.equals(RoutingType.MULTICAST.toString()) && + !val.equals(RoutingType.PASS.toString()) && + !val.equals(RoutingType.STRIP.toString())) { + throw ActiveMQMessageBundle.BUNDLE.invalidRoutingType(val); + } + } + }; + public static final Validator MAX_QUEUE_CONSUMERS = new Validator() { @Override public void validate(String name, Object value) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 44d1a07..7b98602 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -1554,6 +1554,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { String transformerClassName = getString(e, "transformer-class-name", null, Validators.NO_CHECK); + RoutingType routingType = RoutingType.valueOf(getString(e, "routing-type", ActiveMQDefaultConfiguration.getDefaultDivertRoutingType(), Validators.ROUTING_TYPE)); + String filterString = null; NodeList children = e.getChildNodes(); @@ -1566,7 +1568,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { } } - DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName); + DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(routingType); mainConfig.getDivertConfigurations().add(config); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 82f3943..4464062 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -1895,11 +1895,23 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active final boolean exclusive, final String filterString, final String transformerClassName) throws Exception { + createDivert(name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, ActiveMQDefaultConfiguration.getDefaultDivertRoutingType()); + } + + @Override + public void createDivert(final String name, + final String routingName, + final String address, + final String forwardingAddress, + final boolean exclusive, + final String filterString, + final String transformerClassName, + final String routingType) throws Exception { checkStarted(); clearIO(); try { - DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName); + DivertConfiguration config = new DivertConfiguration().setName(name).setRoutingName(routingName).setAddress(address).setForwardingAddress(forwardingAddress).setExclusive(exclusive).setFilterString(filterString).setTransformerClassName(transformerClassName).setRoutingType(RoutingType.valueOf(routingType)); server.deployDivert(config); } finally { blockOnIO(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java index 6c47778..e87e333 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/DivertControlImpl.java @@ -99,6 +99,16 @@ public class DivertControlImpl extends AbstractControl implements DivertControl } @Override + public String getRoutingType() { + clearIO(); + try { + return configuration.getRoutingType().toString(); + } finally { + blockOnIO(); + } + } + + @Override public String getUniqueName() { clearIO(); try { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 1c20ba5..ee8f0ef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -410,4 +410,7 @@ public interface ActiveMQMessageBundle { IllegalArgumentException invalidRoutingTypeForAddress(RoutingType routingType, String address, Set<RoutingType> supportedRoutingTypes); + + @Message(id = 119208, value = "Invalid routing type {0}", format = Message.Format.MESSAGE_FORMAT) + IllegalArgumentException invalidRoutingType(String val); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index aadcba9..aebcb9a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1802,7 +1802,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { Filter filter = FilterImpl.createFilter(config.getFilterString()); - Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName, new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice, storageManager); + Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()), sName, new SimpleString(config.getRoutingName()), config.isExclusive(), filter, transformer, postOffice, storageManager, config.getRoutingType()); Binding binding = new DivertBinding(storageManager.generateID(), sAddress, divert); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java index 5782379..fd55521 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DivertImpl.java @@ -16,12 +16,14 @@ */ package org.apache.activemq.artemis.core.server.impl; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.filter.Filter; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.server.Divert; import org.apache.activemq.artemis.core.server.RoutingContext; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.cluster.Transformer; import org.jboss.logging.Logger; @@ -49,6 +51,8 @@ public class DivertImpl implements Divert { private final StorageManager storageManager; + private final RoutingType routingType; + public DivertImpl(final SimpleString forwardAddress, final SimpleString uniqueName, final SimpleString routingName, @@ -56,7 +60,8 @@ public class DivertImpl implements Divert { final Filter filter, final Transformer transformer, final PostOffice postOffice, - final StorageManager storageManager) { + final StorageManager storageManager, + final RoutingType routingType) { this.forwardAddress = forwardAddress; this.uniqueName = uniqueName; @@ -72,6 +77,8 @@ public class DivertImpl implements Divert { this.postOffice = postOffice; this.storageManager = storageManager; + + this.routingType = routingType; } @Override @@ -97,6 +104,20 @@ public class DivertImpl implements Divert { copy.setExpiration(message.getExpiration()); + switch (routingType) { + case ANYCAST: + copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); + break; + case MULTICAST: + copy.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + break; + case STRIP: + copy.removeProperty(Message.HDR_ROUTING_TYPE); + break; + case PASS: + break; + } + if (transformer != null) { copy = transformer.transform(copy); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/artemis-server/src/main/resources/schema/artemis-configuration.xsd ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 7069c09..c9d1f5b 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -1568,6 +1568,22 @@ </xsd:element> <xsd:element ref="filter" maxOccurs="1" minOccurs="0"/> + + <xsd:element name="routing-type" default="STRIP" maxOccurs="1" minOccurs="0"> + <xsd:annotation> + <xsd:documentation> + how should the routing-type on the diverted messages be set? + </xsd:documentation> + </xsd:annotation> + <xsd:simpleType> + <xsd:restriction base="xsd:string"> + <xsd:enumeration value="ANYCAST"/> + <xsd:enumeration value="MULTICAST"/> + <xsd:enumeration value="STRIP"/> + <xsd:enumeration value="PASS"/> + </xsd:restriction> + </xsd:simpleType> + </xsd:element> </xsd:all> <xsd:attribute name="name" type="xsd:ID" use="required"> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java index a9501d8..8774088 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/DivertTest.java @@ -122,6 +122,142 @@ public class DivertTest extends ActiveMQTestBase { } @Test + public void testSingleNonExclusiveDivertWithRoutingType() throws Exception { + final String testAddress = "testAddress"; + + final String forwardAddress = "forwardAddress"; + + DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress); + + Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf); + + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false)); + + server.start(); + + ServerLocator locator = createInVMNonHALocator(); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true); + + final SimpleString queueName1 = new SimpleString("queue1"); + + final SimpleString queueName2 = new SimpleString("queue2"); + + session.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false); + + session.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2, null, false); + + session.start(); + + ClientProducer producer = session.createProducer(new SimpleString(testAddress)); + + ClientConsumer consumer1 = session.createConsumer(queueName1); + + ClientConsumer consumer2 = session.createConsumer(queueName2); + + final int numMessages = 1; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session.createMessage(false); + + message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + + message.putIntProperty(propKey, i); + + producer.send(message); + } + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = consumer1.receive(DivertTest.TIMEOUT); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + message.acknowledge(); + } + + Assert.assertNull(consumer1.receiveImmediate()); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = consumer2.receive(DivertTest.TIMEOUT); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + message.acknowledge(); + } + + Assert.assertNull(consumer2.receiveImmediate()); + } + + @Test + public void testSingleExclusiveDivertWithRoutingType() throws Exception { + final String testAddress = "testAddress"; + + final String forwardAddress = "forwardAddress"; + + DivertConfiguration divertConf = new DivertConfiguration().setName("divert1").setRoutingName("divert1").setAddress(testAddress).setForwardingAddress(forwardAddress).setExclusive(true); + + Configuration config = createDefaultInVMConfig().addDivertConfiguration(divertConf); + + ActiveMQServer server = addServer(ActiveMQServers.newActiveMQServer(config, false)); + + server.start(); + + ServerLocator locator = createInVMNonHALocator(); + + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession session = sf.createSession(false, true, true); + + final SimpleString queueName1 = new SimpleString("queue1"); + + final SimpleString queueName2 = new SimpleString("queue2"); + + session.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false); + + session.createQueue(new SimpleString(testAddress), RoutingType.MULTICAST, queueName2, null, false); + + session.start(); + + ClientProducer producer = session.createProducer(new SimpleString(testAddress)); + + ClientConsumer consumer1 = session.createConsumer(queueName1); + + final int numMessages = 1; + + final SimpleString propKey = new SimpleString("testkey"); + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = session.createMessage(false); + + message.putByteProperty(Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); + + message.putIntProperty(propKey, i); + + producer.send(message); + } + + for (int i = 0; i < numMessages; i++) { + ClientMessage message = consumer1.receive(DivertTest.TIMEOUT); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey)); + + message.acknowledge(); + } + + Assert.assertNull(consumer1.receiveImmediate()); + } + + @Test public void testSingleDivertWithExpiry() throws Exception { final String testAddress = "testAddress"; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index 985b495..280fdc4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -713,6 +713,18 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes } @Override + public void createDivert(String name, + String routingName, + String address, + String forwardingAddress, + boolean exclusive, + String filterString, + String transformerClassName, + String routingType) throws Exception { + proxy.invokeOperation("createDivert", name, routingName, address, forwardingAddress, exclusive, filterString, transformerClassName, routingType); + } + + @Override public void destroyDivert(String name) throws Exception { proxy.invokeOperation("destroyDivert", name); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/77684850/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java index 61ecda2..48528ce 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/DivertControlUsingCoreTest.java @@ -62,6 +62,11 @@ public class DivertControlUsingCoreTest extends DivertControlTest { } @Override + public String getRoutingType() { + return (String) proxy.retrieveAttributeValue("routingType"); + } + + @Override public String getUniqueName() { return (String) proxy.retrieveAttributeValue("uniqueName"); }