This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 83ab47b  feat subscribe & unsubscribe (#2714)
83ab47b is described below

commit 83ab47bb681eb4ffad4191cc30116ef5e4a50fe4
Author: Sinsy <[email protected]>
AuthorDate: Thu Jan 6 11:21:41 2022 +0800

    feat subscribe & unsubscribe (#2714)
---
 .../org/apache/shenyu/protocol/mqtt/Publish.java   |  4 +-
 .../org/apache/shenyu/protocol/mqtt/Subscribe.java | 77 ++++++++++++++++++++++
 .../apache/shenyu/protocol/mqtt/Unsubscribe.java   | 23 ++++++-
 .../mqtt/repositories/SubscribeRepository.java     | 10 +++
 4 files changed, 110 insertions(+), 4 deletions(-)

diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Publish.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Publish.java
index 584cea9..b8804cf 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Publish.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Publish.java
@@ -52,9 +52,7 @@ public class Publish extends MessageType {
         String message = byteBufToString(payload);
         //// todo qos
         MqttQoS mqttQoS = msg.fixedHeader().qosLevel();
-        if (mqttQoS.value() > 0) {
-            Singleton.INST.get(TopicRepository.class).add(topic, message);
-        }
+        Singleton.INST.get(TopicRepository.class).add(topic, message);
         int packetId = msg.variableHeader().packetId();
         CompletableFuture.runAsync(() -> send(topic, payload, packetId));
 
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
index 188d17c..bfec427 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
@@ -17,8 +17,30 @@
 
 package org.apache.shenyu.protocol.mqtt;
 
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.util.CharsetUtil;
+import org.apache.shenyu.common.utils.Singleton;
+import org.apache.shenyu.protocol.mqtt.repositories.TopicRepository;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
+import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
+import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
+import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
 
 /**
  *  Subscribe to named topics.
@@ -27,6 +49,61 @@ public class Subscribe extends MessageType {
 
     @Override
     public void subscribe(final ChannelHandlerContext ctx, final 
MqttSubscribeMessage msg) {
+        Channel channel = ctx.channel();
+
+        if (isConnected()) {
+            channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
+            return;
+        }
+        List<MqttTopicSubscription> mqttTopicSubscriptions = 
msg.payload().topicSubscriptions();
+        int packetId = msg.variableHeader().messageId();
+        //// todo Regular match
+        List<String> ackTopics = mqttTopicSubscriptions
+                .stream()
+                .filter(topicSub -> topicSub.qualityOfService() != FAILURE)
+                .map(MqttTopicSubscription::topicName)
+                .collect(Collectors.toList());
+        for (String ackTopic : ackTopics) {
+            String message = 
Singleton.INST.get(TopicRepository.class).get(ackTopic);
+            sendSubMessage(ackTopic, message, packetId, channel);
+
+        }
+
+        sendSubAckMessage(packetId, ackTopics, channel);
+    }
+
+    /**
+     * call back request of message.
+     * @param packetId packetId
+     * @param ackTopics ackTopics
+     * @param channel channel
+     */
+    private void sendSubAckMessage(final int packetId, final List<String> 
ackTopics, final Channel channel) {
+
+        List<Integer> qos = new ArrayList<>();
+        for (int i = 0; i < ackTopics.size(); i++) {
+            // default qos 0
+            qos.add(MqttQoS.AT_MOST_ONCE.value());
+        }
+
+        MqttFixedHeader fixedHeader = new 
MqttFixedHeader(MqttMessageType.SUBACK, false, AT_MOST_ONCE,
+                false, 0);
+        MqttSubAckPayload payload = new MqttSubAckPayload(qos);
+        MqttSubAckMessage mqttSubAckMessage = new 
MqttSubAckMessage(fixedHeader, from(packetId), payload);
+        channel.writeAndFlush(mqttSubAckMessage);
+    }
 
+    /**
+     * send subscribe message.
+     * @param topic topic
+     * @param message message
+     * @param packetId packetId
+     * @param channel channel
+     */
+    private void sendSubMessage(final String topic, final String message, 
final int packetId, final Channel channel) {
+        MqttFixedHeader fixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, true, 0);
+        MqttPublishVariableHeader varHeader = new 
MqttPublishVariableHeader(topic, packetId);
+        MqttPublishMessage mqttPublishMessage = new 
MqttPublishMessage(fixedHeader, varHeader, Unpooled.copiedBuffer(message, 
CharsetUtil.UTF_8));
+        channel.writeAndFlush(mqttPublishMessage);
     }
 }
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Unsubscribe.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Unsubscribe.java
index 777ac3e..376015a 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Unsubscribe.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Unsubscribe.java
@@ -17,8 +17,19 @@
 
 package org.apache.shenyu.protocol.mqtt;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
+import org.apache.shenyu.common.utils.Singleton;
+import org.apache.shenyu.protocol.mqtt.repositories.SubscribeRepository;
+
+import java.util.List;
+
+import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
 
 /**
  * Unsubscribe from named topics.
@@ -27,6 +38,16 @@ public class Unsubscribe extends MessageType {
 
     @Override
     public void unsubscribe(final ChannelHandlerContext ctx, final 
MqttUnsubscribeMessage msg) {
-
+        if (isConnected()) {
+            return;
+        }
+        List<String> topics = msg.payload().topics();
+        Channel channel = ctx.channel();
+        Singleton.INST.get(SubscribeRepository.class).remove(topics, channel);
+        int packetId = msg.variableHeader().messageId();
+        MqttFixedHeader mqttFixedHeader = new 
MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 
0);
+        MqttUnsubAckMessage mqttUnsubAckMessage = new 
MqttUnsubAckMessage(mqttFixedHeader, from(packetId));
+        channel.writeAndFlush(mqttUnsubAckMessage);
     }
+
 }
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
index ac61252..bf987bc 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
@@ -50,6 +50,15 @@ public class SubscribeRepository implements 
BaseRepository<List<String>, List<Ch
         CompletableFuture.runAsync(() -> 
topics.parallelStream().forEach(TOPIC_CHANNEL_FACTORY::remove));
     }
 
+    /**
+     * remove subscribe channel.
+     * @param topics topics
+     * @param channel channel
+     */
+    public void remove(final List<String> topics, final Channel channel) {
+        CompletableFuture.runAsync(() -> topics.parallelStream().forEach(topic 
-> TOPIC_CHANNEL_FACTORY.get(topic).remove(channel)));
+    }
+
     @Override
     public List<Channel> get(final List<String> topics) {
         Set<Channel> channels = new CopyOnWriteArraySet<>();
@@ -65,4 +74,5 @@ public class SubscribeRepository implements 
BaseRepository<List<String>, List<Ch
     public List<Channel> get(final String topic) {
         return TOPIC_CHANNEL_FACTORY.getOrDefault(topic, new 
CopyOnWriteArrayList<>());
     }
+
 }

Reply via email to