Repository: activemq-artemis
Updated Branches:
  refs/heads/ARTEMIS-780 a73aa0951 -> a33a047d4


Fix MQTT JMSSend Test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a33a047d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a33a047d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a33a047d

Branch: refs/heads/ARTEMIS-780
Commit: a33a047d479c7c6bf58fa68afcaa22e7a8390524
Parents: 8766305
Author: Martyn Taylor <mtay...@redhat.com>
Authored: Thu Dec 1 12:45:15 2016 +0000
Committer: Martyn Taylor <mtay...@redhat.com>
Committed: Thu Dec 1 15:26:23 2016 +0000

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTPublishManager.java      | 16 +++++++++++++---
 .../core/protocol/mqtt/MQTTSubscriptionManager.java |  2 +-
 .../tests/integration/mqtt/imported/MQTTTest.java   | 11 ++++-------
 3 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index fb3363f..8218208 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -83,7 +83,7 @@ public class MQTTPublishManager {
    }
 
    private void createManagementAddress() {
-      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX +  
state.getClientId());
+      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + 
state.getClientId());
    }
 
    private void createManagementQueue() throws Exception {
@@ -113,10 +113,13 @@ public class MQTTPublishManager {
          if (qos == 0) {
             sendServerMessage((int) message.getMessageID(), 
(ServerMessageImpl) message, deliveryCount, qos);
             session.getServerSession().acknowledge(consumer.getID(), 
message.getMessageID());
-         } else {
+         } else if (qos == 1 || qos == 2) {
             int mqttid = outboundStore.generateMqttId(message.getMessageID(), 
consumer.getID());
             outboundStore.publish(mqttid, message.getMessageID(), 
consumer.getID());
             sendServerMessage(mqttid, (ServerMessageImpl) message, 
deliveryCount, qos);
+         } else {
+            // Client must have disconnected and it's Subscription QoS cleared
+            consumer.individualCancel(message.getMessageID(), false);
          }
       }
    }
@@ -232,7 +235,14 @@ public class MQTTPublishManager {
    }
 
    private int decideQoS(ServerMessage message, ServerConsumer consumer) {
-      int subscriptionQoS = 
session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+
+      int subscriptionQoS = -1;
+      try {
+         subscriptionQoS = 
session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+      } catch (NullPointerException e) {
+         // This can happen if the client disconnected during a server send.
+         return subscriptionQoS;
+      }
 
       int qos = 2;
       if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index b3542d3..c4b8b94 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -94,7 +94,7 @@ public class MQTTSubscriptionManager {
 
       Queue q = session.getServer().locateQueue(queue);
       if (q == null) {
-         q = session.getServerSession().createQueue(new SimpleString(address), 
queue, RoutingType.MULTICAST, managementFilter, false, 
MQTTUtil.DURABLE_MESSAGES && qos >= 0, true);
+         q = session.getServerSession().createQueue(new SimpleString(address), 
queue, RoutingType.MULTICAST, managementFilter, false, 
MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
       } else {
          if (q.isDeleteOnNoConsumers()) {
             throw 
ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), 
q.getName(), "deleteOnNoConsumers", false, true);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index c342853..58d75d8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1096,8 +1096,8 @@ public class MQTTTest extends MQTTTestSupport {
       connection.start();
 
       Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      javax.jms.Queue queue = s.createQueue(destinationName);
-      MessageProducer producer = s.createProducer(queue);
+      javax.jms.Topic topic = s.createTopic(destinationName);
+      MessageProducer producer = s.createProducer(topic);
 
       // send retained message from JMS
       final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
@@ -1626,10 +1626,7 @@ public class MQTTTest extends MQTTTestSupport {
          SimpleString coreAddress = new SimpleString("foo.bar");
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)};
 
-         AddressInfo addressInfo = new AddressInfo(coreAddress);
-         getServer().createOrUpdateAddressInfo(addressInfo);
-
-         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new 
SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false);
+         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new 
SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, true);
 
          MQTT mqtt = createMQTTConnection();
          mqtt.setClientId(clientId);
@@ -1675,7 +1672,7 @@ public class MQTTTest extends MQTTTestSupport {
       try {
          String clientId = "testMqtt";
          SimpleString coreAddress = new SimpleString("foo.bar");
-         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new 
SimpleString(clientId + "." + coreAddress), null, false, true, 
Queue.MAX_CONSUMERS_UNLIMITED, true, false);
+         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new 
SimpleString(clientId + "." + coreAddress), null, false, true, 
Queue.MAX_CONSUMERS_UNLIMITED, true, true);
 
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)};
 

Reply via email to