[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2019-01-09 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2466#discussion_r246563323
  
--- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTQueueCleanTest.java
 ---
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+public class MQTTQueueCleanTest extends MQTTTestSupport {
+
+   private static final ActiveMQServerLogger log = 
ActiveMQServerLogger.LOGGER;
+
+   @Test
+   public void testQueueCleanWhenConnectionSynExeConnectAndDisconnect() 
throws Exception {
+  Random random = new Random();
+  Set clientProviders = new HashSet<>(11);
+  int repeatCount = 0;
+  String address = "clean/test";
+  String clientId = "sameClientId";
+  String queueName = "::sameClientId.clean.test";
+  //The abnormal scene does not necessarily occur, repeating 100 times 
to ensure the recurrence of the abnormality
+  while (repeatCount < 100) {
+ repeatCount++;
+ int subConnectionCount = random.nextInt(50) + 1;
+ int sC = 0;
+ try {
+//Reconnect at least twice to reproduce the problem
+while (sC < subConnectionCount) {
+   sC++;
+   MQTTClientProvider clientProvider = getMQTTClientProvider();
+   clientProvider.setClientId(clientId);
+   initializeConnection(clientProvider);
+   clientProviders.add(clientProvider);
+   clientProvider.subscribe(address, AT_LEAST_ONCE);
+}
+ } finally {
+for (MQTTClientProvider clientProvider : clientProviders) {
+   clientProvider.disconnect();
+}
+clientProviders.clear();
+assertTrue(waitForBindings(server, queueName, false, 0, 0, 
1));
--- End diff --

From what I can tell the overridden version of `waitForBindings` isn't 
necessary. You could just use something like:

`assertTrue(Wait.waitFor(() -> 
server.locateQueue(SimpleString.toSimpleString(queueName)) == null));`


---


[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2019-01-09 Thread jbertram
Github user jbertram commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2466#discussion_r246562512
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 ---
@@ -3604,4 +3605,14 @@ private void 
deployReloadableConfigFromConfiguration() throws Exception {
   return externalComponents;
}
 
+   @Override
+   public Set queueConsumersQuery(SimpleString queueName) {
--- End diff --

This method isn't necessary. You can use the `locateQueue` method and 
simply invoke `getConsumers` on the returned value.


---


[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2018-12-19 Thread onlyMIT
GitHub user onlyMIT reopened a pull request:

https://github.com/apache/activemq-artemis/pull/2466

ARTEMIS-2206 The MQTT consumer reconnection caused the queue to not be 
cle…

### Test environment

1. Use 10,000 (9 thousand senders, 1 thousand consumers) MQTT connection on 
one server to test Artemis, set the 'cleanSession' property to true;
2. MQTT client: paho 1.2.0;
3. Server: CPU Cores:32, Mem:64G, SSD:250G, HDD:1T

### Issue

**Issue 1**
Artemis broker has the following exception log:
`[Thread-0 
(ActiveMQ-remoting-threads-ActiveMQServerImpl::serverUUID=fb358579-feb3-11e8-bc7c-141877a7fd13-1409545055)]
 17:27:59,035 WARN  [org.apache.activemq.artemis.utils.actors.OrderedExecutor] 
null: java.lang.NullPointerException
at 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager.isClientConnected(MQTTProtocolManager.java:182)
 [:]
at 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager.disconnect(MQTTConnectionManager.java:150)
 [:]
at 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTFailureListener.connectionFailed(MQTTFailureListener.java:37)
 [:]
at 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection.fail(MQTTConnection.java:147)
 [:]
at 
org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.issueFailure(RemotingServiceImpl.java:561)
 [:]
at 
org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.connectionDestroyed(RemotingServiceImpl.java:542)
 [:]
at 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor$Listener.connectionDestroyed(NettyAcceptor.java:858)
 [:]
at 
org.apache.activemq.artemis.core.remoting.impl.netty.ActiveMQChannelHandler.lambda$channelInactive$0(ActiveMQChannelHandler.java:83)
 [:]
at 
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
 [:]
at 
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
 [:]
at 
org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:66)
 [:]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[rt.jar:1.8.0_101]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[rt.jar:1.8.0_101]
at 
org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
 [:]`

**Issue 2**
After closing all client connections, 64 queues were not cleaned up。

### Analysis and simulation reproduction

  When the MQTT consumer client (cleanSession property set to true) 
reconnected,There is a certain probability that the queue will not be 
automatically cleared and a NullPointerException will be thrown.
  This is because the MQTT consumer client thinks that its connection has 
been disconnected and triggers reconnection, but the MQTT connection is still 
alive at Artemis broker. This bug occurs when the Artemis broker to start 
processing a ‘new MQTT connection’ while closing the ‘old MQTT 
connection’.
  Create an MQTT consumer (cleanSession: true, clientID: superConsumer, 
topic: mit.test) and connect to the Artemis broker. Create another MQTT 
consumer to set the same cleanSession, clientID, and topic, then start 
connecting with the Artemis broker. Close the two MQTT connections, and so many 
times after repeated trials, there is a probability to reproduce the two 
problems mentioned above.

### Solution

**Issue 1**

  When 'session.getProtocolManager().isClientConnected(clientId, 
session.getConnection())' is called, if the 'MQTTConnection' instance retrieved 
from 'connectedClients' is 'null', a NullPointerException is thrown. Add a 
non-null decision in the 'MQTTProtocolManager.isClientConnected' method.

**Issue 2** 

1. Remove ‘InterruptedException’ from the 
‘MQTTConnectionManager.getSessionState’ method because the 
‘InterruptedException’ exception will never be thrown in this method;
2. 'MQTTConnectionManager.connect' and 'MQTTConnectionManager.disconnect' 
methods add 'synchronized' with the MQTTSessionState instance as a lock.In the 
Artemis broker, all MQTT connections using the same clientId share the same 
MQTTSessionState instance. After adding this lock, you can avoid calling the 
'connect' and 'disconnect' methods on the MQTT connections with the same 
clientId.
3. For the MQTT protocol, there is one and only one consumer connection per 
queue, which is a good choice for closing the old MQTT consumer before the new 
MQTT consumer connects.
The original code could not effectively clean up the 'old consumer' in 
the queue when the 'new MQTT connection' was connected to the Artemis broker. 
Modify ‘MQTTSubscriptionManager.removeSubscription’ to get the queue 
consumer collection from 

[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2018-12-19 Thread onlyMIT
Github user onlyMIT closed the pull request at:

https://github.com/apache/activemq-artemis/pull/2466


---


[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2018-12-18 Thread onlyMIT
Github user onlyMIT commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2466#discussion_r242583621
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 ---
@@ -113,32 +113,7 @@
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.security.impl.SecurityStoreImpl;
-import org.apache.activemq.artemis.core.server.ActivateCallback;
--- End diff --

Thanks, this is my IDE problem, I will pay attention to it.


---


[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2018-12-18 Thread onlyMIT
Github user onlyMIT commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2466#discussion_r242582946
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 ---
@@ -194,12 +191,13 @@ private synchronized void removeSubscription(String 
address) throws Exception {
   SimpleString internalQueueName = 
getQueueNameForTopic(internalAddress);
   session.getSessionState().removeSubscription(address);
 
-
-  ServerConsumer consumer = consumers.get(address);
+  Set queueConsumers = 
session.getServer().queueConsumersQuery(internalQueueName);
--- End diff --

In the ‘queueConsumersQuery’ method, use 'queueName' to query 'Binding' 
and get a queue through ‘Binding’. I think I am getting all the consumers 
in the specified queue, not all consumers at the same address.You can check the 
details of the 'queueConsumersQuery' method.
In the ‘ActiveMQServerImpl.destroyQueue’ method, the number of 
consumers on a queue is also obtained in this way.Originally I wanted to put 
the shutdown consumer code here, and later found that there were multiple calls 
to the ‘ActiveMQServerImpl.destroyQueue’ method, and I abandoned the idea 
of putting the consumer code off here.


---


[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2018-12-18 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2466#discussion_r242541448
  
--- Diff: 
artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 ---
@@ -194,12 +191,13 @@ private synchronized void removeSubscription(String 
address) throws Exception {
   SimpleString internalQueueName = 
getQueueNameForTopic(internalAddress);
   session.getSessionState().removeSubscription(address);
 
-
-  ServerConsumer consumer = consumers.get(address);
+  Set queueConsumers = 
session.getServer().queueConsumersQuery(internalQueueName);
--- End diff --

I think this is wrong. You should only check for consumers on this Session.

Your logic is closing every consumer to the same address. You keep multiple 
connections and you have a failure.


---


[GitHub] activemq-artemis pull request #2466: ARTEMIS-2206 The MQTT consumer reconnec...

2018-12-18 Thread clebertsuconic
Github user clebertsuconic commented on a diff in the pull request:

https://github.com/apache/activemq-artemis/pull/2466#discussion_r242540859
  
--- Diff: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 ---
@@ -113,32 +113,7 @@
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.security.SecurityStore;
 import org.apache.activemq.artemis.core.security.impl.SecurityStoreImpl;
-import org.apache.activemq.artemis.core.server.ActivateCallback;
--- End diff --

This will fail checkstyle. no * imports


---