[GitHub] activemq-artemis issue #2485: ARTEMIS-2217 ‘MQTTSessionState’ in the ‘...
Github user onlyMIT commented on the issue: https://github.com/apache/activemq-artemis/pull/2485 @jbertram OKï¼I will close this RPãThere is a small suggestion, can you keep the "getSessionState" methodï¼because the RP #2466 has used this methodã In the case of not sure if other RPs are used in the corresponding method, and it is not necessary to delete the method. Maybe it is better to keep this method. Anyway I will close this RP, #2491 may be more readable ---
[GitHub] activemq-artemis pull request #2485: ARTEMIS-2217 ‘MQTTSessionState’ in ...
Github user onlyMIT closed the pull request at: https://github.com/apache/activemq-artemis/pull/2485 ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r246247094 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- Look good to me! ---
[GitHub] activemq-artemis issue #2493: ARTEMIS-2223 when a new consumer is created, n...
Github user onlyMIT commented on the issue: https://github.com/apache/activemq-artemis/pull/2493 The test is correct.can close the jira and pull request ---
[GitHub] activemq-artemis pull request #2493: ARTEMIS-2223 when a new consumer is cre...
Github user onlyMIT closed the pull request at: https://github.com/apache/activemq-artemis/pull/2493 ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r246095670 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- @jbertram in the getSessionState method.Only clear stateï¼not call 'clean()' method. In fact, the queue is not cleaned up. I use the code for the âpahoâ test. The first consumer "cleanSession=false", using a different clientID to open a producer to send a message. Close the producer and consumer, use the same clientID and cleanSession = true" to open the second consumer and find that the consumer will consume the legacy message in the queueãSo I suspect that there is a problem with the test code. I am always looking for why my test results will consume the legacy messages in the queue, and your test results will notã After seeing your information, I re-reviewed the code and found that the test code did not have any problems. What is causing my doubts is that because of your change, when cleanSession=true, only the MQTTSessionState is cleaned up, the queue still exists, and the legacy messages in the queue are consumed when resubscribing. Can close [#2493 ](https://github.com/apache/activemq-artemis/pull/2493) . I think you need to review your changeã ---
[GitHub] activemq-artemis issue #2491: ARTEMIS-2217 remove state on clean MQTT sessio...
Github user onlyMIT commented on the issue: https://github.com/apache/activemq-artemis/pull/2491 @jbertram I think I found out why your solution passed the testï¼ your test was constructed on a wrong test code. the solution your provided, after I tested it with the revised test code, the test did not pass. The test issueï¼ i created a jira and opened a [#2493](https://github.com/apache/activemq-artemis/pull/2493) to solve this test issue. we need itï¼ calling "clean()" in the "setIsClean(boolean isClean)" method. ---
[GitHub] activemq-artemis pull request #2493: ARTEMIS-2223 when a new consumer is cre...
GitHub user onlyMIT opened a pull request: https://github.com/apache/activemq-artemis/pull/2493 ARTEMIS-2223 when a new consumer is created, no subscription is called. In the 'MQTTTest.testCleanSession()' test method, when a new consumer is created, no subscription is called.Consumers need to subscribe before they consume the news. You can merge this pull request into a Git repository by running: $ git pull https://github.com/onlyMIT/activemq-artemis ARTEMIS-2223 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2493.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2493 commit 2a6d230c4c8cd739090c5da58754a005d62e1a40 Author: onlyMIT Date: 2019-01-08T09:39:37Z ARTEMIS-2223 when a new consumer is created, no subscription is called. In the 'MQTTTest.testCleanSession()' test method, when a new consumer is created, no subscription is called.Consumers need to subscribe before they consume the news. ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r245879352 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- It is necessary to call the "clean()" method to clean up old session information when creating a connection. If it is not cleaned up, when the cleanSession of the last MQTT consumer is false, and the cleanSession of the connected MQTT consumer is true, the message in the old queue will be consumed, which is actually not allowed. I think this is why calling "clean()" in the "setIsClean(boolean isClean)" method ---
[GitHub] activemq-artemis pull request #2491: ARTEMIS-2217 remove state on clean MQTT...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2491#discussion_r245878622 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java --- @@ -117,14 +118,11 @@ boolean getStopped() { } boolean isClean() { - return isClean; + return clean; } - void setIsClean(boolean isClean) throws Exception { - this.isClean = isClean; - if (isClean) { - clean(); - } + void setClean(boolean clean) throws Exception { + this.clean = clean; --- End diff -- It is necessary to call the "clean()" method to clean up old session information when creating a connection. If it is not cleaned up, when the cleanSession of the last MQTT consumer is false, and the cleanSession of the connected MQTT consumer is true, the message in the old queue will be consumed, which is actually not allowed. I think this is why calling "clean()" in the "setIsClean(boolean isClean)" method ---
[GitHub] activemq-artemis issue #2485: ARTEMIS-2217 ‘MQTTSessionState’ in the ‘...
Github user onlyMIT commented on the issue: https://github.com/apache/activemq-artemis/pull/2485 @jbertram Nice! Really need more specific test ---
[GitHub] activemq-artemis pull request #2485: ARTEMIS-2217 ‘MQTTSessionState’ in ...
GitHub user onlyMIT opened a pull request: https://github.com/apache/activemq-artemis/pull/2485 ARTEMIS-2217 âMQTTSessionStateâ in the âSESSIONS ConcurrentHashMapâ n⦠â¦ever be removed âMQTTSessionStateâ in the âSESSIONS ConcurrentHashMapâ should be removed when the conusmer ï¼cleanSession is trueï¼ connection is closed You can merge this pull request into a Git repository by running: $ git pull https://github.com/onlyMIT/activemq-artemis ARTEMIS-2217 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/activemq-artemis/pull/2485.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2485 commit c4c951c3a2cf74f211d9b7c17cad48f27b725ff5 Author: onlyMIT Date: 2018-12-29T08:53:04Z ARTEMIS-2217 âMQTTSessionStateâ in the âSESSIONS ConcurrentHashMapâ never be removed âMQTTSessionStateâ in the âSESSIONS ConcurrentHashMapâ should be removed when the conusmer ï¼cleanSession is trueï¼ connection is closed ---
[GitHub] activemq-artemis issue #2466: ARTEMIS-2206 The MQTT consumer reconnection ca...
Github user onlyMIT commented on the issue: https://github.com/apache/activemq-artemis/pull/2466 @michaelandrepearce @clebertsuconic Added automated testing. And added locks for sub and unSub operations ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
GitHub user onlyMIT reopened a pull request: https://github.com/apache/activemq-artemis/pull/2466 ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be cle⦠### Test environment 1. Use 10,000 (9 thousand senders, 1 thousand consumers) MQTT connection on one server to test Artemis, set the 'cleanSession' property to trueï¼ 2. MQTT client: paho 1.2.0; 3. Server: CPU Cores:32, Mem:64G, SSD:250G, HDD:1T ### Issue **Issue 1** Artemis broker has the following exception log: `[Thread-0 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=fb358579-feb3-11e8-bc7c-141877a7fd13-1409545055)] 17:27:59,035 WARN [org.apache.activemq.artemis.utils.actors.OrderedExecutor] null: java.lang.NullPointerException at org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager.isClientConnected(MQTTProtocolManager.java:182) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager.disconnect(MQTTConnectionManager.java:150) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTFailureListener.connectionFailed(MQTTFailureListener.java:37) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection.fail(MQTTConnection.java:147) [:] at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.issueFailure(RemotingServiceImpl.java:561) [:] at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:542) [:] at org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:858) [:] at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.lambda$channelInactive$0(ActiveMQChannelHandler.java:83) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [:] at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66) [:] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_101] at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [:]` **Issue 2** After closing all client connections, 64 queues were not cleaned upã ### Analysis and simulation reproduction When the MQTT consumer client (cleanSession property set to true) reconnected,There is a certain probability that the queue will not be automatically cleared and a NullPointerException will be thrown. This is because the MQTT consumer client thinks that its connection has been disconnected and triggers reconnection, but the MQTT connection is still alive at Artemis broker. This bug occurs when the Artemis broker to start processing a ânew MQTT connectionâ while closing the âold MQTT connectionâ. Create an MQTT consumer (cleanSession: true, clientID: superConsumer, topic: mit.test) and connect to the Artemis broker. Create another MQTT consumer to set the same cleanSession, clientID, and topic, then start connecting with the Artemis broker. Close the two MQTT connections, and so many times after repeated trials, there is a probability to reproduce the two problems mentioned above. ### Solution **Issue 1** When 'session.getProtocolManager().isClientConnected(clientId, session.getConnection())' is called, if the 'MQTTConnection' instance retrieved from 'connectedClients' is 'null', a NullPointerException is thrown. Add a non-null decision in the 'MQTTProtocolManager.isClientConnected' method. **Issue 2** 1. Remove âInterruptedExceptionâ from the âMQTTConnectionManager.getSessionStateâ method because the âInterruptedExceptionâ exception will never be thrown in this method; 2. 'MQTTConnectionManager.connect' and 'MQTTConnectionManager.disconnect' methods add 'synchronized' with the MQTTSessionState instance as a lock.In the Artemis broker, all MQTT connections using the same clientId share the same MQTTSessionState instance. After adding this lock, you can avoid calling the 'connect' and 'disconnect' methods on the MQTT connections with the same clientId. 3. For the MQTT protocol, there is one and only one consumer connection per queue, which is a good choice for closing the old MQTT consumer before the new MQTT consumer connects. The original code could not effectively clean up the 'old consumer' in the queue when the 'new MQTT connection' was connected to the Artemis broker. Modify âMQTTSubscriptionManager.removeSubscriptionâ to get the queue consumer collection from
[GitHub] activemq-artemis issue #2466: ARTEMIS-2206 The MQTT consumer reconnection ca...
Github user onlyMIT commented on the issue: https://github.com/apache/activemq-artemis/pull/2466 @clebertsuconic Sorry, the pull request has been closed due to my misoperation, and it has now been restored. thank you very much for your help ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user onlyMIT closed the pull request at: https://github.com/apache/activemq-artemis/pull/2466 ---
[GitHub] activemq-artemis issue #2466: ARTEMIS-2206 The MQTT consumer reconnection ca...
Github user onlyMIT commented on the issue: https://github.com/apache/activemq-artemis/pull/2466 @clebertsuconic this is my first pull request on GitHubãThank you for your understanding, I will try to use JIRA to submit the code, if you find the problem, I hope you correct me. ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r242583621 --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java --- @@ -113,32 +113,7 @@ import org.apache.activemq.artemis.core.security.SecurityAuth; import org.apache.activemq.artemis.core.security.SecurityStore; import org.apache.activemq.artemis.core.security.impl.SecurityStoreImpl; -import org.apache.activemq.artemis.core.server.ActivateCallback; --- End diff -- Thanks, this is my IDE problem, I will pay attention to it. ---
[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...
Github user onlyMIT commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2466#discussion_r242582946 --- Diff: artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java --- @@ -194,12 +191,13 @@ private synchronized void removeSubscription(String address) throws Exception { SimpleString internalQueueName = getQueueNameForTopic(internalAddress); session.getSessionState().removeSubscription(address); - - ServerConsumer consumer = consumers.get(address); + Set queueConsumers = session.getServer().queueConsumersQuery(internalQueueName); --- End diff -- In the âqueueConsumersQueryâ method, use 'queueName' to query 'Binding' and get a queue through âBindingâ. I think I am getting all the consumers in the specified queue, not all consumers at the same address.You can check the details of the 'queueConsumersQuery' method. In the âActiveMQServerImpl.destroyQueueâ method, the number of consumers on a queue is also obtained in this way.Originally I wanted to put the shutdown consumer code here, and later found that there were multiple calls to the âActiveMQServerImpl.destroyQueueâ method, and I abandoned the idea of putting the consumer code off here. ---
[GitHub] activemq-artemis issue #2466: NO-JIRA The MQTT consumer reconnection caused ...
Github user onlyMIT commented on the issue: https://github.com/apache/activemq-artemis/pull/2466 @michaelandrepearce JIRA has been created ï¼ https://issues.apache.org/jira/projects/ARTEMIS/issues/ARTEMIS-2206?filter=addedrecently Regarding automated testing, in addition to the simulation method that I said, do you have any better suggestions? ---
[GitHub] activemq-artemis pull request #2466: NO-JIRA The MQTT consumer reconnection ...
GitHub user onlyMIT opened a pull request: https://github.com/apache/activemq-artemis/pull/2466 NO-JIRA The MQTT consumer reconnection caused the queue to not be cle⦠### Test environment 1. Use 10,000 (9 thousand senders, 1 thousand consumers) MQTT connection on one server to test Artemis, set the 'cleanSession' property to trueï¼ 2. MQTT client: paho 1.2.0; 3. Server: CPU Cores:32, Mem:64G, SSD:250G, HDD:1T ### Issue **Issue 1** Artemis broker has the following exception log: `[Thread-0 (ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=fb358579-feb3-11e8-bc7c-141877a7fd13-1409545055)] 17:27:59,035 WARN [org.apache.activemq.artemis.utils.actors.OrderedExecutor] null: java.lang.NullPointerException at org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager.isClientConnected(MQTTProtocolManager.java:182) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager.disconnect(MQTTConnectionManager.java:150) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTFailureListener.connectionFailed(MQTTFailureListener.java:37) [:] at org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection.fail(MQTTConnection.java:147) [:] at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.issueFailure(RemotingServiceImpl.java:561) [:] at org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:542) [:] at org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:858) [:] at org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.lambda$channelInactive$0(ActiveMQChannelHandler.java:83) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42) [:] at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31) [:] at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66) [:] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [rt.jar:1.8.0_101] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [rt.jar:1.8.0_101] at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118) [:]` **Issue 2** After closing all client connections, 64 queues were not cleaned upã ### Analysis and simulation reproduction When the MQTT consumer client (cleanSession property set to true) reconnected,There is a certain probability that the queue will not be automatically cleared and a NullPointerException will be thrown. This is because the MQTT consumer client thinks that its connection has been disconnected and triggers reconnection, but the MQTT connection is still alive at Artemis broker. This bug occurs when the Artemis broker to start processing a ânew MQTT connectionâ while closing the âold MQTT connectionâ. Create an MQTT consumer (cleanSession: true, clientID: superConsumer, topic: mit.test) and connect to the Artemis broker. Create another MQTT consumer to set the same cleanSession, clientID, and topic, then start connecting with the Artemis broker. Close the two MQTT connections, and so many times after repeated trials, there is a probability to reproduce the two problems mentioned above. ### Solution **Issue 1** When 'session.getProtocolManager().isClientConnected(clientId, session.getConnection())' is called, if the 'MQTTConnection' instance retrieved from 'connectedClients' is 'null', a NullPointerException is thrown. Add a non-null decision in the 'MQTTProtocolManager.isClientConnected' method. **Issue 2** 1. Remove âInterruptedExceptionâ from the âMQTTConnectionManager.getSessionStateâ method because the âInterruptedExceptionâ exception will never be thrown in this method; 2. 'MQTTConnectionManager.connect' and 'MQTTConnectionManager.disconnect' methods add 'synchronized' with the MQTTSessionState instance as a lock.In the Artemis broker, all MQTT connections using the same clientId share the same MQTTSessionState instance. After adding this lock, you can avoid calling the 'connect' and 'disconnect' methods on the MQTT connections with the same clientId. 3. For the MQTT protocol, there is one and only one consumer connection per queue, which is a good choice for closing the old MQTT consumer before the new MQTT consumer connects. The original code could not effectively clean up the 'old consumer' in the queue when the 'new MQTT connection' was connected to the Artemis broker. Modify âMQTTSubscriptionManager.removeSubscriptionâ to get the queue consumer collection from the â