This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new d02da18 ARTEMIS-2506 MQTT doesn't cleanup underlying connection for bad clients new 4925040 This closes #2853 d02da18 is described below commit d02da18dab71353bf2f4eae0b1038e05d9b9d0ce Author: Howard Gao <howard....@gmail.com> AuthorDate: Fri Sep 27 19:41:15 2019 +0800 ARTEMIS-2506 MQTT doesn't cleanup underlying connection for bad clients When a bad MQTT clients drop its connection without proper closing it the broker doesn't close the underlying physical connection. --- .../artemis/core/protocol/mqtt/MQTTConnection.java | 1 + .../core/remoting/impl/netty/NettyAcceptor.java | 6 ++ .../mqtt/MQTTConnnectionCleanupTest.java | 85 ++++++++++++++++++++++ .../integration/mqtt/imported/MQTTTestSupport.java | 4 - 4 files changed, 92 insertions(+), 4 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index 9a607f8..6b35aed 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -148,6 +148,7 @@ public class MQTTConnection implements RemotingConnection { for (FailureListener listener : copy) { listener.connectionFailed(me, false); } + transportConnection.close(); } private List<FailureListener> copyFailureListeners() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index b9fc72c..0bc99c6 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -462,6 +462,12 @@ public class NettyAcceptor extends AbstractAcceptor { return name; } + + //for test purpose + public Map<Object, NettyServerConnection> getConnections() { + return connections; + } + // only for testing purposes public void setKeyStorePath(String keyStorePath) { this.keyStorePath = keyStorePath; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTConnnectionCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTConnnectionCleanupTest.java new file mode 100644 index 0000000..c5486eb --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTConnnectionCleanupTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.mqtt; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport; +import org.apache.activemq.artemis.utils.Wait; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class MQTTConnnectionCleanupTest extends MQTTTestSupport { + + @Override + protected void addMQTTConnector() { + + Map<String, Object> params = new HashMap<>(); + params.put(TransportConstants.PORT_PROP_NAME, "" + port); + params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT"); + params.put(TransportConstants.CONNECTIONS_ALLOWED, 1); + params.put(TransportConstants.HOST_PROP_NAME, "localhost"); + + TransportConfiguration mqtt = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, "MQTT"); + + server.getConfiguration().addAcceptorConfiguration(mqtt); + } + + @Test(timeout = 30 * 1000) + public void testBadClient() throws Exception { + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(""); + mqtt.setCleanSession(true); + BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + + try { + connection = mqtt.blockingConnection(); + connection.connect(); + fail("second connection shouldn't be allowed"); + } catch (Exception e) { + //ignore. + } + + NettyAcceptor acceptor = (NettyAcceptor) server.getRemotingService().getAcceptor("MQTT"); + assertEquals(1, acceptor.getConnections().size()); + + //now simulate a bad client by manually fail the server connection + RemotingConnection conn = server.getRemotingService().getConnections().iterator().next(); + + assertTrue(conn instanceof MQTTConnection); + + conn.fail(new ActiveMQException("testBadClient")); + + Wait.assertEquals(0, ()->acceptor.getConnections().size()); + + //another connection should be ok + connection = mqtt.blockingConnection(); + connection.connect(); + connection.disconnect(); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index 3ea4c5b..cae6b1d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -243,10 +243,6 @@ public class MQTTTestSupport extends ActiveMQTestBase { // MQTT transport connectors as needed, the port variable is always supposed to be // assigned the primary MQTT connector's port. - Map<String, Object> params = new HashMap<>(); - params.put(TransportConstants.PORT_PROP_NAME, "" + port); - params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT"); - server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:"); LOG.info("Added connector {} to broker", getProtocolScheme());