ARTEMIS-1391 embedding 2 MQTT brokers is broken
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/144dbadc Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/144dbadc Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/144dbadc Branch: refs/heads/master Commit: 144dbadcb590344b89e2e81cc4a1141e5a27f22d Parents: 53c8ee0 Author: Jens Reimann <jreim...@redhat.com> Authored: Thu Sep 7 17:23:09 2017 +0200 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Mon Sep 25 11:00:56 2017 -0400 ---------------------------------------------------------------------- .../protocol/mqtt/MQTTConnectionManager.java | 9 +--- .../core/protocol/mqtt/MQTTProtocolManager.java | 9 ++++ .../integration/mqtt/imported/MQTTFQQNTest.java | 6 --- .../imported/MQTTInterceptorPropertiesTest.java | 18 +++---- .../integration/mqtt/imported/MQTTTest.java | 54 +++++++++++++++++--- .../integration/plugin/MqttPluginTest.java | 20 -------- 6 files changed, 64 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index 02e1c66..79b97a3 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.core.protocol.mqtt; -import java.util.Set; import java.util.UUID; import io.netty.buffer.ByteBuf; @@ -29,7 +28,6 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; import org.apache.activemq.artemis.utils.UUIDGenerator; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; /** * MQTTConnectionManager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these @@ -39,9 +37,6 @@ public class MQTTConnectionManager { private MQTTSession session; - //TODO Read in a list of existing client IDs from stored Sessions. - public static Set<String> CONNECTED_CLIENTS = new ConcurrentHashSet<>(); - private MQTTLogger log = MQTTLogger.LOGGER; private boolean isWill = false; @@ -149,7 +144,7 @@ public class MQTTConnectionManager { session.getSessionState().setAttached(false); String clientId = session.getSessionState().getClientId(); if (clientId != null) { - CONNECTED_CLIENTS.remove(clientId); + session.getProtocolManager().getConnectedClients().remove(clientId); } } } @@ -181,7 +176,7 @@ public class MQTTConnectionManager { // [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null return null; } - } else if (!CONNECTED_CLIENTS.add(clientId)) { + } else if (!session.getProtocolManager().getConnectedClients().add(clientId)) { // ^^^ If the client ID is not unique (i.e. it has already registered) then do not accept it. http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java index 6118b0d..8ee4033 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -38,6 +39,7 @@ import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; /** * MQTTProtocolManager @@ -52,6 +54,9 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>(); private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>(); + //TODO Read in a list of existing client IDs from stored Sessions. + private Set<String> connectedClients = new ConcurrentHashSet<>(); + MQTTProtocolManager(ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) { @@ -172,4 +177,8 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInter public void invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) { super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection); } + + public Set<String> getConnectedClients() { + return connectedClients; + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java index 4f0b229..acbf5d7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTFQQNTest.java @@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.core.server.QueueQueryResult; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -44,10 +42,6 @@ public class MQTTFQQNTest extends MQTTTestSupport { Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); sessions.setAccessible(true); sessions.set(null, new ConcurrentHashMap<>()); - - Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS"); - connectedClients.setAccessible(true); - connectedClients.set(null, new ConcurrentHashSet<>()); super.setUp(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java index 2600952..c95a462 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTInterceptorPropertiesTest.java @@ -16,27 +16,25 @@ */ package org.apache.activemq.artemis.tests.integration.mqtt.imported; +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.felix.resolver.util.ArrayMap; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ErrorCollector; -import java.lang.reflect.Field; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { @Override @@ -45,10 +43,6 @@ public class MQTTInterceptorPropertiesTest extends MQTTTestSupport { Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); sessions.setAccessible(true); sessions.set(null, new ConcurrentHashMap<>()); - - Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS"); - connectedClients.setAccessible(true); - connectedClients.set(null, new ConcurrentHashSet<>()); super.setUp(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index e3c4856..9087938 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -40,15 +40,15 @@ import java.util.regex.Pattern; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.CoreAddressConfiguration; import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; -import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.util.Wait; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpMessage; @@ -85,12 +85,7 @@ public class MQTTTest extends MQTTTestSupport { Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); sessions.setAccessible(true); sessions.set(null, new ConcurrentHashMap<>()); - - Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS"); - connectedClients.setAccessible(true); - connectedClients.set(null, new ConcurrentHashSet<>()); super.setUp(); - } @Test @@ -1990,4 +1985,49 @@ public class MQTTTest extends MQTTTestSupport { } assertTrue(e.getMessage().contains("CONNECTION_REFUSED_IDENTIFIER_REJECTED")); } + + @Test + public void testDoubleBroker() throws Exception { + /* + * Start two embedded server instances for MQTT and connect to them + * with the same MQTT client id. As those are two different instances + * connecting to them with the same client ID must succeed. + */ + + final int port1 = 1884; + final int port2 = 1885; + + final Configuration cfg1 = createDefaultConfig(1, false); + cfg1.addAcceptorConfiguration("mqtt1", "tcp://localhost:" + port1 + "?protocols=MQTT"); + + final Configuration cfg2 = createDefaultConfig(2, false); + cfg2.addAcceptorConfiguration("mqtt2", "tcp://localhost:" + port2 + "?protocols=MQTT"); + + final ActiveMQServer server1 = createServer(cfg1); + server1.start(); + final ActiveMQServer server2 = createServer(cfg2); + server2.start(); + + final String clientId = "client1"; + final MQTT mqtt1 = createMQTTConnection(clientId, true); + final MQTT mqtt2 = createMQTTConnection(clientId, true); + + mqtt1.setHost("localhost", port1); + mqtt2.setHost("localhost", port2); + + final BlockingConnection connection1 = mqtt1.blockingConnection(); + final BlockingConnection connection2 = mqtt2.blockingConnection(); + + try { + connection1.connect(); + connection2.connect(); + } catch (Exception e) { + fail("Connections should have worked."); + } finally { + if (connection1.isConnected()) + connection1.disconnect(); + if (connection2.isConnected()) + connection2.disconnect(); + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/144dbadc/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java index 660df34..2365ae5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/plugin/MqttPluginTest.java @@ -16,20 +16,14 @@ */ package org.apache.activemq.artemis.tests.integration.plugin; -import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; -import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider; import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; -import org.junit.Before; import org.junit.Test; import static org.apache.activemq.artemis.tests.integration.plugin.MethodCalledVerifier.AFTER_CLOSE_CONSUMER; @@ -62,20 +56,6 @@ public class MqttPluginTest extends MQTTTestSupport { private final MethodCalledVerifier verifier = new MethodCalledVerifier(methodCalls); @Override - @Before - public void setUp() throws Exception { - Field sessions = MQTTSession.class.getDeclaredField("SESSIONS"); - sessions.setAccessible(true); - sessions.set(null, new ConcurrentHashMap<>()); - - Field connectedClients = MQTTConnectionManager.class.getDeclaredField("CONNECTED_CLIENTS"); - connectedClients.setAccessible(true); - connectedClients.set(null, new ConcurrentHashSet<>()); - super.setUp(); - - } - - @Override public void configureBroker() throws Exception { super.configureBroker(); server.registerBrokerPlugin(verifier);