[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user jbertram commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r246563323 --- Diff: tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java --- @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.mqtt.imported; + +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.postoffice.Binding; +import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.Queue; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +public class MQTTQueueCleanTest extends MQTTTestSupport { + + private static final ActiveMQServerLogger log = ActiveMQServerLogger.LOGGER; + + @Test + public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() throws Exception { + Random random = new Random(); + Set clientProviders = new HashSet<>(11); + int repeatCount = 0; + String address = "clean/test"; + String clientId = "sameClientId"; + String queueName = "::sameClientId.clean.test"; + //The abnormal scene does not necessarily occur, repeating 100 times to ensure the recurrence of the abnormality + while (repeatCount < 100) { + repeatCount++; + int subConnectionCount = random.nextInt(50) + 1; + int sC = 0; + try { +//Reconnect at least twice to reproduce the problem +while (sC < subConnectionCount) { + sC++; + MQTTClientProvider clientProvider = getMQTTClientProvider(); + clientProvider.setClientId(clientId); + initializeConnection(clientProvider); + clientProviders.add(clientProvider); + clientProvider.subscribe(address, AT_LEAST_ONCE); +} + } finally { +for (MQTTClientProvider clientProvider : clientProviders) { + clientProvider.disconnect(); +} +clientProviders.clear(); +assertTrue(waitForBindings(server, queueName, false, 0, 0, 1)); --- End diff -- From what I can tell the overridden version of `waitForBindings` isn't necessary. You could just use something like: `assertTrue(Wait.waitFor(() -> server.locateQueue(SimpleString.toSimpleString(queueName)) == null));` ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user jbertram commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r246562512 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java --- @@ -3604,4 +3605,14 @@ private void deployReloadableConfigFromConfiguration() throws Exception { return externalComponents; } + @Override + public Set queueConsumersQuery(SimpleString queueName) { --- End diff -- This method isn't necessary. You can use the `locateQueue` method and simply invoke `getConsumers` on the returned value. ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
GitHub user onlyMIT reopened a pull request: https://github.com/apache/activemq-artemis/pull/2466 ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cle⦠### Test environment 1. Use 10,000 (9 thousand senders, 1 thousand consumers) MQTT connection on one server to test Artemis, set the 'cleanSession' property to trueï¼ 2. MQTT client: paho 1.2.0; 3. Server: CPU Cores:32, Mem:64G, SSD:250G, HDD:1T ### Issue **Issue 1** Artemis broker has the following exception log: `[Thread-0 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=fb358579-feb3-11e8-bc7c-141877a7fd13-1409545055)] 17:27:59,035 WARN [org.apache.activemq.artemis.utils.actors.OrderedExecutor] null: java.lang.NullPointerException at org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager.isClientConnected(MQTTProtocolManager.java:182) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager.disconnect(MQTTConnectionManager.java:150) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTFailureListener.connectionFailed(MQTTFailureListener.java:37) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection.fail(MQTTConnection.java:147) [:] at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.issueFailure(RemotingServiceImpl.java:561) [:] at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:542) [:] at org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:858) [:] at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.lambda$channelInactive$0(ActiveMQChannelHandler.java:83) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [:] at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66) [:] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_101] at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [:]` **Issue 2** After closing all client connections, 64 queues were not cleaned upã ### Analysis and simulation reproduction When the MQTT consumer client (cleanSession property set to true) reconnected,There is a certain probability that the queue will not be automatically cleared and a NullPointerException will be thrown. This is because the MQTT consumer client thinks that its connection has been disconnected and triggers reconnection, but the MQTT connection is still alive at Artemis broker. This bug occurs when the Artemis broker to start processing a ânew MQTT connectionâ while closing the âold MQTT connectionâ. Create an MQTT consumer (cleanSession: true, clientID: superConsumer, topic: mit.test) and connect to the Artemis broker. Create another MQTT consumer to set the same cleanSession, clientID, and topic, then start connecting with the Artemis broker. Close the two MQTT connections, and so many times after repeated trials, there is a probability to reproduce the two problems mentioned above. ### Solution **Issue 1** When 'session.getProtocolManager().isClientConnected(clientId, session.getConnection())' is called, if the 'MQTTConnection' instance retrieved from 'connectedClients' is 'null', a NullPointerException is thrown. Add a non-null decision in the 'MQTTProtocolManager.isClientConnected' method. **Issue 2** 1. Remove âInterruptedExceptionâ from the âMQTTConnectionManager.getSessionStateâ method because the âInterruptedExceptionâ exception will never be thrown in this method; 2. 'MQTTConnectionManager.connect' and 'MQTTConnectionManager.disconnect' methods add 'synchronized' with the MQTTSessionState instance as a lock.In the Artemis broker, all MQTT connections using the same clientId share the same MQTTSessionState instance. After adding this lock, you can avoid calling the 'connect' and 'disconnect' methods on the MQTT connections with the same clientId. 3. For the MQTT protocol, there is one and only one consumer connection per queue, which is a good choice for closing the old MQTT consumer before the new MQTT consumer connects. The original code could not effectively clean up the 'old consumer' in the queue when the 'new MQTT connection' was connected to the Artemis broker. Modify âMQTTSubscriptionManager.removeSubscriptionâ to get the queue consumer collection from
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user onlyMIT closed the pull request at: https://github.com/apache/activemq-artemis/pull/2466 ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r242583621 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java --- @@ -113,32 +113,7 @@ import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.security.impl.SecurityStoreImpl; -import org.apache.activemq.artemis.core.server.ActivateCallback; --- End diff -- Thanks, this is my IDE problem, I will pay attention to it. ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r242582946 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java --- @@ -194,12 +191,13 @@ private synchronized void removeSubscription(String address) throws Exception { SimpleString internalQueueName = getQueueNameForTopic(internalAddress); session.getSessionState().removeSubscription(address); - - ServerConsumer consumer = consumers.get(address); + Set queueConsumers = session.getServer().queueConsumersQuery(internalQueueName); --- End diff -- In the âqueueConsumersQueryâ method, use 'queueName' to query 'Binding' and get a queue through âBindingâ. I think I am getting all the consumers in the specified queue, not all consumers at the same address.You can check the details of the 'queueConsumersQuery' method. In the âActiveMQServerImpl.destroyQueueâ method, the number of consumers on a queue is also obtained in this way.Originally I wanted to put the shutdown consumer code here, and later found that there were multiple calls to the âActiveMQServerImpl.destroyQueueâ method, and I abandoned the idea of putting the consumer code off here. ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r242541448 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java --- @@ -194,12 +191,13 @@ private synchronized void removeSubscription(String address) throws Exception { SimpleString internalQueueName = getQueueNameForTopic(internalAddress); session.getSessionState().removeSubscription(address); - - ServerConsumer consumer = consumers.get(address); + Set queueConsumers = session.getServer().queueConsumersQuery(internalQueueName); --- End diff -- I think this is wrong. You should only check for consumers on this Session. Your logic is closing every consumer to the same address. You keep multiple connections and you have a failure. ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user clebertsuconic commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r242540859 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java --- @@ -113,32 +113,7 @@ import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.security.impl.SecurityStoreImpl; -import org.apache.activemq.artemis.core.server.ActivateCallback; --- End diff -- This will fail checkstyle. no * imports ---