Repository: activemq-artemis Updated Branches: refs/heads/ARTEMIS-780 a73aa0951 -> a33a047d4
Fix MQTT JMSSend Test Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a33a047d Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a33a047d Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a33a047d Branch: refs/heads/ARTEMIS-780 Commit: a33a047d479c7c6bf58fa68afcaa22e7a8390524 Parents: 8766305 Author: Martyn Taylor <mtay...@redhat.com> Authored: Thu Dec 1 12:45:15 2016 +0000 Committer: Martyn Taylor <mtay...@redhat.com> Committed: Thu Dec 1 15:26:23 2016 +0000 ---------------------------------------------------------------------- .../core/protocol/mqtt/MQTTPublishManager.java | 16 +++++++++++++--- .../core/protocol/mqtt/MQTTSubscriptionManager.java | 2 +- .../tests/integration/mqtt/imported/MQTTTest.java | 11 ++++------- 3 files changed, 18 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index fb3363f..8218208 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -83,7 +83,7 @@ public class MQTTPublishManager { } private void createManagementAddress() { - managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId()); + managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId()); } private void createManagementQueue() throws Exception { @@ -113,10 +113,13 @@ public class MQTTPublishManager { if (qos == 0) { sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos); session.getServerSession().acknowledge(consumer.getID(), message.getMessageID()); - } else { + } else if (qos == 1 || qos == 2) { int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID()); outboundStore.publish(mqttid, message.getMessageID(), consumer.getID()); sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos); + } else { + // Client must have disconnected and it's Subscription QoS cleared + consumer.individualCancel(message.getMessageID(), false); } } } @@ -232,7 +235,14 @@ public class MQTTPublishManager { } private int decideQoS(ServerMessage message, ServerConsumer consumer) { - int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID()); + + int subscriptionQoS = -1; + try { + subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID()); + } catch (NullPointerException e) { + // This can happen if the client disconnected during a server send. + return subscriptionQoS; + } int qos = 2; if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index b3542d3..c4b8b94 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -94,7 +94,7 @@ public class MQTTSubscriptionManager { Queue q = session.getServer().locateQueue(queue); if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, true); + q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); } else { if (q.isDeleteOnNoConsumers()) { throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index c342853..58d75d8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1096,8 +1096,8 @@ public class MQTTTest extends MQTTTestSupport { connection.start(); Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - javax.jms.Queue queue = s.createQueue(destinationName); - MessageProducer producer = s.createProducer(queue); + javax.jms.Topic topic = s.createTopic(destinationName); + MessageProducer producer = s.createProducer(topic); // send retained message from JMS final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; @@ -1626,10 +1626,7 @@ public class MQTTTest extends MQTTTestSupport { SimpleString coreAddress = new SimpleString("foo.bar"); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; - AddressInfo addressInfo = new AddressInfo(coreAddress); - getServer().createOrUpdateAddressInfo(addressInfo); - - getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false); + getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, true); MQTT mqtt = createMQTTConnection(); mqtt.setClientId(clientId); @@ -1675,7 +1672,7 @@ public class MQTTTest extends MQTTTestSupport { try { String clientId = "testMqtt"; SimpleString coreAddress = new SimpleString("foo.bar"); - getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, false); + getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};