This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 83ab47b feat subscribe & unsubscribe (#2714)
83ab47b is described below
commit 83ab47bb681eb4ffad4191cc30116ef5e4a50fe4
Author: Sinsy <[email protected]>
AuthorDate: Thu Jan 6 11:21:41 2022 +0800
feat subscribe & unsubscribe (#2714)
---
.../org/apache/shenyu/protocol/mqtt/Publish.java | 4 +-
.../org/apache/shenyu/protocol/mqtt/Subscribe.java | 77 ++++++++++++++++++++++
.../apache/shenyu/protocol/mqtt/Unsubscribe.java | 23 ++++++-
.../mqtt/repositories/SubscribeRepository.java | 10 +++
4 files changed, 110 insertions(+), 4 deletions(-)
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Publish.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Publish.java
index 584cea9..b8804cf 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Publish.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Publish.java
@@ -52,9 +52,7 @@ public class Publish extends MessageType {
String message = byteBufToString(payload);
//// todo qos
MqttQoS mqttQoS = msg.fixedHeader().qosLevel();
- if (mqttQoS.value() > 0) {
- Singleton.INST.get(TopicRepository.class).add(topic, message);
- }
+ Singleton.INST.get(TopicRepository.class).add(topic, message);
int packetId = msg.variableHeader().packetId();
CompletableFuture.runAsync(() -> send(topic, payload, packetId));
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
index 188d17c..bfec427 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
@@ -17,8 +17,30 @@
package org.apache.shenyu.protocol.mqtt;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttSubAckPayload;
+import io.netty.handler.codec.mqtt.MqttSubAckMessage;
+import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
+import io.netty.handler.codec.mqtt.MqttPublishMessage;
+import io.netty.util.CharsetUtil;
+import org.apache.shenyu.common.utils.Singleton;
+import org.apache.shenyu.protocol.mqtt.repositories.TopicRepository;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
+import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
+import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
+import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
/**
* Subscribe to named topics.
@@ -27,6 +49,61 @@ public class Subscribe extends MessageType {
@Override
public void subscribe(final ChannelHandlerContext ctx, final
MqttSubscribeMessage msg) {
+ Channel channel = ctx.channel();
+
+ if (isConnected()) {
+ channel.close().addListener(FIRE_EXCEPTION_ON_FAILURE);
+ return;
+ }
+ List<MqttTopicSubscription> mqttTopicSubscriptions =
msg.payload().topicSubscriptions();
+ int packetId = msg.variableHeader().messageId();
+ //// todo Regular match
+ List<String> ackTopics = mqttTopicSubscriptions
+ .stream()
+ .filter(topicSub -> topicSub.qualityOfService() != FAILURE)
+ .map(MqttTopicSubscription::topicName)
+ .collect(Collectors.toList());
+ for (String ackTopic : ackTopics) {
+ String message =
Singleton.INST.get(TopicRepository.class).get(ackTopic);
+ sendSubMessage(ackTopic, message, packetId, channel);
+
+ }
+
+ sendSubAckMessage(packetId, ackTopics, channel);
+ }
+
+ /**
+ * call back request of message.
+ * @param packetId packetId
+ * @param ackTopics ackTopics
+ * @param channel channel
+ */
+ private void sendSubAckMessage(final int packetId, final List<String>
ackTopics, final Channel channel) {
+
+ List<Integer> qos = new ArrayList<>();
+ for (int i = 0; i < ackTopics.size(); i++) {
+ // default qos 0
+ qos.add(MqttQoS.AT_MOST_ONCE.value());
+ }
+
+ MqttFixedHeader fixedHeader = new
MqttFixedHeader(MqttMessageType.SUBACK, false, AT_MOST_ONCE,
+ false, 0);
+ MqttSubAckPayload payload = new MqttSubAckPayload(qos);
+ MqttSubAckMessage mqttSubAckMessage = new
MqttSubAckMessage(fixedHeader, from(packetId), payload);
+ channel.writeAndFlush(mqttSubAckMessage);
+ }
+ /**
+ * send subscribe message.
+ * @param topic topic
+ * @param message message
+ * @param packetId packetId
+ * @param channel channel
+ */
+ private void sendSubMessage(final String topic, final String message,
final int packetId, final Channel channel) {
+ MqttFixedHeader fixedHeader = new
MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, true, 0);
+ MqttPublishVariableHeader varHeader = new
MqttPublishVariableHeader(topic, packetId);
+ MqttPublishMessage mqttPublishMessage = new
MqttPublishMessage(fixedHeader, varHeader, Unpooled.copiedBuffer(message,
CharsetUtil.UTF_8));
+ channel.writeAndFlush(mqttPublishMessage);
}
}
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Unsubscribe.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Unsubscribe.java
index 777ac3e..376015a 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Unsubscribe.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Unsubscribe.java
@@ -17,8 +17,19 @@
package org.apache.shenyu.protocol.mqtt;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
+import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
+import org.apache.shenyu.common.utils.Singleton;
+import org.apache.shenyu.protocol.mqtt.repositories.SubscribeRepository;
+
+import java.util.List;
+
+import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
/**
* Unsubscribe from named topics.
@@ -27,6 +38,16 @@ public class Unsubscribe extends MessageType {
@Override
public void unsubscribe(final ChannelHandlerContext ctx, final
MqttUnsubscribeMessage msg) {
-
+ if (isConnected()) {
+ return;
+ }
+ List<String> topics = msg.payload().topics();
+ Channel channel = ctx.channel();
+ Singleton.INST.get(SubscribeRepository.class).remove(topics, channel);
+ int packetId = msg.variableHeader().messageId();
+ MqttFixedHeader mqttFixedHeader = new
MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false,
0);
+ MqttUnsubAckMessage mqttUnsubAckMessage = new
MqttUnsubAckMessage(mqttFixedHeader, from(packetId));
+ channel.writeAndFlush(mqttUnsubAckMessage);
}
+
}
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
index ac61252..bf987bc 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
@@ -50,6 +50,15 @@ public class SubscribeRepository implements
BaseRepository<List<String>, List<Ch
CompletableFuture.runAsync(() ->
topics.parallelStream().forEach(TOPIC_CHANNEL_FACTORY::remove));
}
+ /**
+ * remove subscribe channel.
+ * @param topics topics
+ * @param channel channel
+ */
+ public void remove(final List<String> topics, final Channel channel) {
+ CompletableFuture.runAsync(() -> topics.parallelStream().forEach(topic
-> TOPIC_CHANNEL_FACTORY.get(topic).remove(channel)));
+ }
+
@Override
public List<Channel> get(final List<String> topics) {
Set<Channel> channels = new CopyOnWriteArraySet<>();
@@ -65,4 +74,5 @@ public class SubscribeRepository implements
BaseRepository<List<String>, List<Ch
public List<Channel> get(final String topic) {
return TOPIC_CHANNEL_FACTORY.getOrDefault(topic, new
CopyOnWriteArrayList<>());
}
+
}