https://issues.apache.org/jira/browse/AMQ-6002 - escape client id in virtual 
topic mqtt subscription

(cherry picked from commit aa743cbd7ab7815bfa84e11aee9d3783c08e9eea)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/147400c2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/147400c2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/147400c2

Branch: refs/heads/activemq-5.12.x
Commit: 147400c231e4da740e844453a5c7abd653ba8b66
Parents: 9544354
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Oct 7 11:28:19 2015 +0200
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Oct 7 16:45:40 2015 +0000

----------------------------------------------------------------------
 .../MQTTVirtualTopicSubscriptionStrategy.java   |   7 +-
 .../activemq/transport/mqtt/PahoMQTTTest.java   | 115 +++++++++++--------
 2 files changed, 69 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/147400c2/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
index 457981a..434c248 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -84,22 +84,21 @@ public class MQTTVirtualTopicSubscriptionStrategy extends 
AbstractMQTTSubscripti
         ActiveMQDestination destination = null;
         int prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
         ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
-
+        String converted = convertMQTTToActiveMQ(topicName);
         if (!protocol.isCleanSession() && protocol.getClientId() != null && 
requestedQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal()) {
-            String converted = convertMQTTToActiveMQ(topicName);
+
             if (converted.startsWith(VIRTUALTOPIC_PREFIX)) {
                 destination = new ActiveMQTopic(converted);
                 prefetch = 
ActiveMQPrefetchPolicy.DEFAULT_DURABLE_TOPIC_PREFETCH;
                 consumerInfo.setSubscriptionName(requestedQoS + ":" + 
topicName);
             } else {
                 converted = VIRTUALTOPIC_CONSUMER_PREFIX +
-                            protocol.getClientId() + ":" + requestedQoS + "." +
+                            convertMQTTToActiveMQ(protocol.getClientId()) + 
":" + requestedQoS + "." +
                             VIRTUALTOPIC_PREFIX + converted;
                 destination = new ActiveMQQueue(converted);
                 prefetch = ActiveMQPrefetchPolicy.DEFAULT_QUEUE_PREFETCH;
             }
         } else {
-            String converted = convertMQTTToActiveMQ(topicName);
             if (!converted.startsWith(VIRTUALTOPIC_PREFIX)) {
                 converted = VIRTUALTOPIC_PREFIX + converted;
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/147400c2/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
index 3e149ec..2890831 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoMQTTTest.java
@@ -43,6 +43,10 @@ public class PahoMQTTTest extends MQTTTestSupport {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PahoMQTTTest.class);
 
+    protected MessageConsumer createConsumer(Session s, String topic) throws 
Exception {
+        return s.createConsumer(s.createTopic(topic));
+    }
+
     @Test(timeout = 300000)
     public void testLotsOfClients() throws Exception {
 
@@ -52,7 +56,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
         ActiveMQConnection activeMQConnection = (ActiveMQConnection) 
cf.createConnection();
         activeMQConnection.start();
         Session s = activeMQConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
+        MessageConsumer consumer = createConsumer(s, "test");
 
         final AtomicInteger receiveCounter = new AtomicInteger();
         consumer.setMessageListener(new MessageListener() {
@@ -118,7 +122,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
         ActiveMQConnection activeMQConnection = (ActiveMQConnection) 
cf.createConnection();
         activeMQConnection.start();
         Session s = activeMQConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = s.createConsumer(s.createTopic("test"));
+        MessageConsumer consumer = createConsumer(s, "test");
 
         MqttClient client = new MqttClient("tcp://localhost:" + getPort(), 
"clientid", new MemoryPersistence());
         client.connect();
@@ -128,16 +132,11 @@ public class PahoMQTTTest extends MQTTTestSupport {
         assertNotNull(msg);
 
         client.disconnect();
-        client.close();
     }
 
     @Test(timeout = 300000)
     public void testSubs() throws Exception {
 
-        stopBroker();
-        protocolConfig = 
"transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
-        startBroker();
-
         final DefaultListener listener = new DefaultListener();
         // subscriber connects and creates durable sub
         MqttClient client = createClient(false, "receive", listener);
@@ -195,10 +194,6 @@ public class PahoMQTTTest extends MQTTTestSupport {
     @Test(timeout = 300000)
     public void testOverlappingTopics() throws Exception {
 
-        stopBroker();
-        protocolConfig = 
"transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
-        startBroker();
-
         final DefaultListener listener = new DefaultListener();
         // subscriber connects and creates durable sub
         MqttClient client = createClient(false, "receive", listener);
@@ -278,7 +273,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
             public boolean isSatisified() throws Exception {
                 return listener.result != null;
             }
-        }, TimeUnit.SECONDS.toMillis(20)));
+        }, TimeUnit.SECONDS.toMillis(5)));
         assertNull(listener.result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
 
@@ -290,7 +285,7 @@ public class PahoMQTTTest extends MQTTTestSupport {
             public boolean isSatisified() throws Exception {
                 return listener.result != null;
             }
-        }, TimeUnit.SECONDS.toMillis(20)));
+        }, TimeUnit.SECONDS.toMillis(5)));
         assertNull(listener.result);
         assertTrue(client.getPendingDeliveryTokens().length == 0);
     }
@@ -354,53 +349,75 @@ public class PahoMQTTTest extends MQTTTestSupport {
     }
 
     @Test(timeout = 300000)
-    public void testVirtualTopicQueueRestore() throws Exception {
-
-        stopBroker();
-        protocolConfig = 
"transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
-        startBroker();
-
-        String user10 = "user10";
-        String password10 = "user10";
-        String clientId10 = "client-10";
-        String topic10 = "user10/";
-        MqttConnectOptions options10 = new MqttConnectOptions();
-        options10.setCleanSession(false);
-        options10.setUserName(user10);
-        options10.setPassword(password10.toCharArray());
-        MqttClient client10 = createClient(false, clientId10, null);
-        client10.subscribe(topic10 + clientId10 + "/#", 1);
-        client10.subscribe(topic10 + "#", 1);
-
-        String user1 = "user1";
-        String password1 = "user1";
-        String clientId1 = "client-1";
-        String topic1 = "user1/";
+    public void testClientIdSpecialChars() throws Exception {
+        testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1);
+        testClientIdSpecialChars(MqttConnectOptions.MQTT_VERSION_3_1_1);
+    }
+
+    protected void testClientId(String clientId, int mqttVersion, final 
DefaultListener clientAdminMqttCallback) throws Exception {
         MqttConnectOptions options1 = new MqttConnectOptions();
         options1.setCleanSession(false);
-        options1.setUserName(user1);
-        options1.setPassword(password1.toCharArray());
+        options1.setUserName("client1");
+        options1.setPassword("client1".toCharArray());
+        options1.setMqttVersion(mqttVersion);
+        final DefaultListener client1MqttCallback = new DefaultListener();
+        MqttClient client1 = createClient(options1, clientId, 
client1MqttCallback);
+        client1.setCallback(client1MqttCallback);
 
-        MqttClient client1 = createClient(false, clientId1, null);
-        client1.subscribe(topic1 + clientId1 + "/#", 1);
-        client1.subscribe(topic1 + "#", 1);
+        String topic = "client1/" + clientId + "/topic";
+        client1.subscribe(topic, 1);
 
-        RegionBroker regionBroker = (RegionBroker) 
brokerService.getBroker().getAdaptor(RegionBroker.class);
+        String message = "Message from client: " + clientId;
+        client1.publish(topic, message.getBytes(), 1, false);
 
-        String[] queues = new 
String[]{"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.>",
-                
"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.client-10.>",
-                "Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.>",
-                
"Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.client-1.>"};
 
-        for (String queueName : queues) {
-            Destination queue = 
regionBroker.getQueueRegion().getDestinations(new 
ActiveMQQueue(queueName)).iterator().next();
-            assertEquals("Queue " + queueName + " have more than one 
consumer", 1, queue.getConsumers().size());
-        }
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return client1MqttCallback.result != null;
+            }
+        }, TimeUnit.SECONDS.toMillis(45), 
TimeUnit.MILLISECONDS.toMillis(200)));
+        assertEquals(message, client1MqttCallback.result);
+        assertEquals(1, client1MqttCallback.received);
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return clientAdminMqttCallback.result != null;
+            }
+        }, TimeUnit.SECONDS.toMillis(45), 
TimeUnit.MILLISECONDS.toMillis(200)));
+        assertEquals(message, clientAdminMqttCallback.result);
+
+        assertTrue(client1.isConnected());
+        client1.disconnect();
+    }
+
+    protected void testClientIdSpecialChars(int mqttVersion) throws Exception {
+
+        LOG.info("Test MQTT version {}", mqttVersion);
+        MqttConnectOptions optionsAdmin = new MqttConnectOptions();
+        optionsAdmin.setCleanSession(false);
+        optionsAdmin.setUserName("admin");
+        optionsAdmin.setPassword("admin".toCharArray());
+
+        DefaultListener clientAdminMqttCallback = new DefaultListener();
+        MqttClient clientAdmin = createClient(optionsAdmin, "admin", 
clientAdminMqttCallback);
+        clientAdmin.subscribe("#", 1);
+
+        testClientId(":%&&@.:llll", mqttVersion, clientAdminMqttCallback);
+        testClientId("Consumer:id:AT_LEAST_ONCE", mqttVersion, 
clientAdminMqttCallback);
+        testClientId("Consumer:qid:EXACTLY_ONCE:VirtualTopic", mqttVersion, 
clientAdminMqttCallback);
+        
testClientId("Consumertestmin:testst:AT_LEAST_ONCE.VirtualTopic::AT_LEAST_ONCE",
 mqttVersion, clientAdminMqttCallback);
     }
 
+
     protected MqttClient createClient(boolean cleanSession, String clientId, 
MqttCallback listener) throws Exception {
         MqttConnectOptions options = new MqttConnectOptions();
         options.setCleanSession(cleanSession);
+        return createClient(options, clientId, listener);
+    }
+
+    protected MqttClient createClient(MqttConnectOptions options, String 
clientId, MqttCallback listener) throws Exception {
         final MqttClient client = new MqttClient("tcp://localhost:" + 
getPort(), clientId, new MemoryPersistence());
         client.setCallback(listener);
         client.connect(options);

Reply via email to