[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user asfgit closed the pull request at: https://github.com/apache/activemq-artemis/pull/2115 ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191947996 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -604,26 +607,39 @@ public String getAddress() { return addressSimpleString == null ? null : addressSimpleString.toString(); } + + public SimpleString cachedAddressSimpleString(String address) { + return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools); + } + @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + setAddress(cachedAddressSimpleString(address)); return this; } @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address); return this; } @Override public SimpleString getAddressSimpleString() { if (address == null) { - Properties properties = getProtonMessage().getProperties(); - if (properties != null) { -setAddress(properties.getTo()); - } else { -return null; + + address = createExtraProperties().getSimpleStringProperty(ADDRESS_PROPERTY); + + if (address != null) { +return address; + } + + + Properties properties = getProperties(); + if (properties != null && properties.getTo() != null) { +address = cachedAddressSimpleString(properties.getTo()); +return address; --- End diff -- @michaelandrepearce @gemmellr this made me realize a real issue though. I was always creating the extraProperties even when not needed. I added a better check now.. it's amended already. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191946396 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java --- @@ -355,10 +355,27 @@ default Message setValidatedUserID(String validatedUserID) { String getAddress(); + /** +* Notice the address in AMQP or other protocols +* is only effective if the message is copied or reencoded. --- End diff -- no.. not any more... will remove this. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191924187 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -604,26 +607,39 @@ public String getAddress() { return addressSimpleString == null ? null : addressSimpleString.toString(); } + + public SimpleString cachedAddressSimpleString(String address) { + return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools); + } + @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + setAddress(cachedAddressSimpleString(address)); return this; } @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address); return this; } @Override public SimpleString getAddressSimpleString() { if (address == null) { - Properties properties = getProtonMessage().getProperties(); - if (properties != null) { -setAddress(properties.getTo()); - } else { -return null; + + address = createExtraProperties().getSimpleStringProperty(ADDRESS_PROPERTY); + + if (address != null) { +return address; + } + + + Properties properties = getProperties(); + if (properties != null && properties.getTo() != null) { +address = cachedAddressSimpleString(properties.getTo()); +return address; --- End diff -- I'd typically let it fall through here too, though in this case it really doesn't look that out of place considering the prior null check and exit above it already from checking the extra properties. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191922206 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -604,26 +607,39 @@ public String getAddress() { return addressSimpleString == null ? null : addressSimpleString.toString(); } + + public SimpleString cachedAddressSimpleString(String address) { + return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools); + } + @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + setAddress(cachedAddressSimpleString(address)); return this; } @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address); return this; } @Override public SimpleString getAddressSimpleString() { if (address == null) { - Properties properties = getProtonMessage().getProperties(); - if (properties != null) { -setAddress(properties.getTo()); - } else { -return null; + --- End diff -- Some extra lines of whitespace here below the next if. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191921539 --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java --- @@ -355,10 +355,27 @@ default Message setValidatedUserID(String validatedUserID) { String getAddress(); + /** +* Notice the address in AMQP or other protocols +* is only effective if the message is copied or reencoded. --- End diff -- Is this true now for AMQP messages? Do the 'extra properties' set for the AMQP message actually influence the 'to' address anymore? ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191918917 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -604,26 +607,39 @@ public String getAddress() { return addressSimpleString == null ? null : addressSimpleString.toString(); } + + public SimpleString cachedAddressSimpleString(String address) { + return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools); + } + @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + setAddress(cachedAddressSimpleString(address)); return this; } @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address); return this; } @Override public SimpleString getAddressSimpleString() { if (address == null) { - Properties properties = getProtonMessage().getProperties(); - if (properties != null) { -setAddress(properties.getTo()); - } else { -return null; + + address = createExtraProperties().getSimpleStringProperty(ADDRESS_PROPERTY); + + if (address != null) { +return address; + } + + + Properties properties = getProperties(); + if (properties != null && properties.getTo() != null) { +address = cachedAddressSimpleString(properties.getTo()); +return address; --- End diff -- i personally prefer Single Entry, Single Exit, but as you note its all personal pref :), no worries if you meant to do it on purpose, just was checking it wasn't by mistake. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191917895 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -604,26 +607,39 @@ public String getAddress() { return addressSimpleString == null ? null : addressSimpleString.toString(); } + + public SimpleString cachedAddressSimpleString(String address) { + return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools); + } + @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + setAddress(cachedAddressSimpleString(address)); return this; } @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address); return this; } @Override public SimpleString getAddressSimpleString() { if (address == null) { - Properties properties = getProtonMessage().getProperties(); - if (properties != null) { -setAddress(properties.getTo()); - } else { -return null; + + address = createExtraProperties().getSimpleStringProperty(ADDRESS_PROPERTY); + + if (address != null) { +return address; + } + + + Properties properties = getProperties(); + if (properties != null && properties.getTo() != null) { +address = cachedAddressSimpleString(properties.getTo()); +return address; --- End diff -- I like having it explicitly as it shows my real intention. Other than my ocd level not really needed. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191915585 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -604,26 +607,39 @@ public String getAddress() { return addressSimpleString == null ? null : addressSimpleString.toString(); } + + public SimpleString cachedAddressSimpleString(String address) { + return CoreMessageObjectPools.cachedAddressSimpleString(address, coreMessageObjectPools); + } + @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + setAddress(cachedAddressSimpleString(address)); return this; } @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + createExtraProperties().putSimpleStringProperty(ADDRESS_PROPERTY, address); return this; } @Override public SimpleString getAddressSimpleString() { if (address == null) { - Properties properties = getProtonMessage().getProperties(); - if (properties != null) { -setAddress(properties.getTo()); - } else { -return null; + + address = createExtraProperties().getSimpleStringProperty(ADDRESS_PROPERTY); + + if (address != null) { +return address; + } + + + Properties properties = getProperties(); + if (properties != null && properties.getTo() != null) { +address = cachedAddressSimpleString(properties.getTo()); +return address; --- End diff -- is this needed as anyhow last statement returns the address. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191797149 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); --- End diff -- Why not make this create simple string and then invoke the setAddress(simplestring) , there by not needing the two new methods and logic encapsulated in the setAddress(simplestring) ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191796188 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); return this; } + private void internalSetAddress(String address) { + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + } + @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + setProtonAddress(address.toString()); --- End diff -- if the user calls reencode on a plugin, certainly the payload will be affected. I'm changing setAddress to use extraproperties.. that will clear any confusion from this. It wasn't the intend of this fix though.. but i will do it here. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191792711 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); return this; } + private void internalSetAddress(String address) { + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + } + @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + setProtonAddress(address.toString()); --- End diff -- I must be misinterpretting things then. The setAddress methods (again, there are two) appear used already in several places of note within the broker. For example, I interpret this bit as meaning every AMQP message received on a link with a target address will have it called on it, is that the case? https://github.com/apache/activemq-artemis/blob/2.6.0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java#L448 If so, doesnt that mean that anyone reencoding the message for any reason, even if they have changed nothing, might find the payload changes due to the setAddress side effects? ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191779067 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); return this; } + private void internalSetAddress(String address) { + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + } + @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + setProtonAddress(address.toString()); --- End diff -- if the user don't want to change the address. just leave the message alone.. if the user calls reencode. it will be the user's responsibility to make the change. I don't think we should be that strict. We don't change the address unless the message is being copied. the user has the option to do it themselves. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191778368 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); return this; } + private void internalSetAddress(String address) { + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + } + @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + setProtonAddress(address.toString()); --- End diff -- @gemmellr notice this has been the case for a while.. there was a bug on setAddress which was affecting customers. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r19182 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); return this; } + private void internalSetAddress(String address) { + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + } + @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + setProtonAddress(address.toString()); --- End diff -- > this is used on expiry or DLQ. you could argue those are copied messages and new messages. I can see that view can be argued, though I dont personally take that view myself. Thats not really the main thing that concerned me. > I had the option to use extraProperties which only transverses through the broker... however there are implications on protocol conversions that could open further possibilities for bugs. this was the simpler change given it's copying the message. > > Notice that setAddress will not be effective unless the message is reencoded.. so it only works on Expiry or DLQ. I think this opens cases for other general bugs and protocol violations too. Having since discussed a bit with @mtaylor and looked myself, the setAddress methods (there are two, you only added javadoc for the lesser used one) seem to be used in a number of important places, which is what I was concerned about given the new side effect it has. I hadn't twigged on the need to explicitly reencode, but in general think its a bad idea for the in-memory representation of the message to generally differ from the sent payload as it seems like it may now in many cases. That also means when anyone does do something and reencode (e.g in a divert) they may get side effects they had no part in. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191760666 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); return this; } + private void internalSetAddress(String address) { + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + } + @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + setProtonAddress(address.toString()); --- End diff -- I added javadoc on Message.setAddress ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191758985 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); return this; } + private void internalSetAddress(String address) { + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + } + @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + setProtonAddress(address.toString()); --- End diff -- this is used on expiry or DLQ. you could argue those are copied messages and new messages. We actually copy the message during the DLQ or Expiry transfer. I had the option to use extraProperties which only transverses through the broker... however there are implications on protocol conversions that could open further possibilities for bugs. this was the simpler change given it's copying the message. Notice that setAddress will not be effective unless the message is reencoded.. so it only works on Expiry or DLQ. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191757281 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { + + private static final int NUMBER_OF_SERVERS = 2; + private static final SimpleString queueName = SimpleString.toSimpleString("queues.0"); + + + // I'm taking any number that /2 = Odd + // to avoid perfect roundings and making sure messages are evenly distributed + private static final int NUMBER_OF_MESSAGES = 77 * 2; + + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}}); + } + + @Parameterized.Parameter(0) + public String protocol; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + } + + private void startServers(MessageLoadBalancingType loadBalancingType) throws Exception { + setupServers(); + + setRedistributionDelay(0); + + setupCluster(loadBalancingType); + + AddressSettings as = new AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry")); + + getServer(0).getAddressSettingsRepository().addMatch("queues.*", as); + getServer(1).getAddressSettingsRepository().addMatch("queues.*", as); + + startServers(0); + startServers(1); + + createQueue(SimpleString.toSimpleString("queues.expiry")); + createQueue(queueName); + } + + private void createQueue(SimpleString queueName) throws Exception { + servers[0].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true); + servers[1].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true); + } + + protected boolean isNetty() { + return true; + } + + private
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191756950 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -1261,6 +1275,9 @@ public String toString() { ", messageID=" + getMessageID() + ", address=" + getAddress() + ", size=" + getEncodeSize() + + ", ApplicationProperties=" + getApplicationProperties() + --- End diff -- thanks :) ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191680587 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { + + private static final int NUMBER_OF_SERVERS = 2; + private static final SimpleString queueName = SimpleString.toSimpleString("queues.0"); + + + // I'm taking any number that /2 = Odd + // to avoid perfect roundings and making sure messages are evenly distributed + private static final int NUMBER_OF_MESSAGES = 77 * 2; + + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}}); + } + + @Parameterized.Parameter(0) + public String protocol; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + } + + private void startServers(MessageLoadBalancingType loadBalancingType) throws Exception { + setupServers(); + + setRedistributionDelay(0); + + setupCluster(loadBalancingType); + + AddressSettings as = new AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry")); + + getServer(0).getAddressSettingsRepository().addMatch("queues.*", as); + getServer(1).getAddressSettingsRepository().addMatch("queues.*", as); + + startServers(0); + startServers(1); + + createQueue(SimpleString.toSimpleString("queues.expiry")); + createQueue(queueName); + } + + private void createQueue(SimpleString queueName) throws Exception { + servers[0].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true); + servers[1].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true); + } + + protected boolean isNetty() { + return true; + } + + private
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191714303 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); --- End diff -- Same question/comment as below for the SimpleString setAddress method. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191715800 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -606,22 +606,36 @@ public String getAddress() { @Override public AMQPMessage setAddress(String address) { - this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + internalSetAddress(address); + setProtonAddress(address); return this; } + private void internalSetAddress(String address) { + this.address = SimpleString.toSimpleString(address, coreMessageObjectPools == null ? null : coreMessageObjectPools.getAddressStringSimpleStringPool()); + } + @Override public AMQPMessage setAddress(SimpleString address) { this.address = address; + setProtonAddress(address.toString()); --- End diff -- Where all is the outer setAddress method used? The AMQP Properties section is part of the immutable bare message so we shouldn't in general be setting the 'to' address in it or creating the section if they weren't present. Exception might be made during cases like protocol conversion, but it seems like it should be explicit rather than a side effect that might see unintended use as here. ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user gemmellr commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191721270 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/crossprotocol/ProtocolsMessageLoadBalancingTest.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.crossprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.util.Arrays; +import java.util.Collection; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.cluster.MessageFlowRecord; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory; +import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(value = Parameterized.class) +public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase { + + private static final int NUMBER_OF_SERVERS = 2; + private static final SimpleString queueName = SimpleString.toSimpleString("queues.0"); + + + // I'm taking any number that /2 = Odd + // to avoid perfect roundings and making sure messages are evenly distributed + private static final int NUMBER_OF_MESSAGES = 77 * 2; + + + @Parameterized.Parameters(name = "protocol={0}") + public static Collection getParameters() { + return Arrays.asList(new Object[][]{{"AMQP"}, {"CORE"}}); + } + + @Parameterized.Parameter(0) + public String protocol; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + } + + private void startServers(MessageLoadBalancingType loadBalancingType) throws Exception { + setupServers(); + + setRedistributionDelay(0); + + setupCluster(loadBalancingType); + + AddressSettings as = new AddressSettings().setRedistributionDelay(0).setExpiryAddress(SimpleString.toSimpleString("queues.expiry")); + + getServer(0).getAddressSettingsRepository().addMatch("queues.*", as); + getServer(1).getAddressSettingsRepository().addMatch("queues.*", as); + + startServers(0); + startServers(1); + + createQueue(SimpleString.toSimpleString("queues.expiry")); + createQueue(queueName); + } + + private void createQueue(SimpleString queueName) throws Exception { + servers[0].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true); + servers[1].createQueue(queueName, RoutingType.ANYCAST, queueName, (SimpleString) null, (SimpleString) null, true, false, false, false, false, -1, false, false, false, true); + } + + protected boolean isNetty() { + return true; + } + + private
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
Github user michaelandrepearce commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2115#discussion_r191635530 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java --- @@ -1261,6 +1275,9 @@ public String toString() { ", messageID=" + getMessageID() + ", address=" + getAddress() + ", size=" + getEncodeSize() + + ", ApplicationProperties=" + getApplicationProperties() + --- End diff -- nit: lowecase "a" - applicationProperties ---
[GitHub] activemq-artemis pull request #2115: ARTEMIS-1858 Expiry messages are not tr...
GitHub user clebertsuconic opened a pull request: https://github.com/apache/activemq-artemis/pull/2115 ARTEMIS-1858 Expiry messages are not transversing clustering with AMQP You can merge this pull request into a Git repository by running: $ git pull https://github.com/clebertsuconic/activemq-artemis redistribution Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2115.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2115 commit 756ee8a07a2924160e136349fb0a67dcdc622859 Author: Clebert Suconic Date: 2018-05-30T02:12:58Z ARTEMIS-1858 Expiry messages are not transversing clustering with AMQP ---