This is an automated email from the ASF dual-hosted git repository.

impactcn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b69199e7 [ISSUE #3118]  feature protocol-mqtt: enhanced mqtt plugin 
(#3377)
0b69199e7 is described below

commit 0b69199e7ec070c0da47edb6e9951a4efddcb0d8
Author: 拉姆徐 <[email protected]>
AuthorDate: Fri May 6 14:12:53 2022 +0800

    [ISSUE #3118]  feature protocol-mqtt: enhanced mqtt plugin (#3377)
    
    * feature protocol-mqtt: handle pingReq message
    
    * feature protocol-mqtt: handle unSubscribe message
---
 .../org/apache/shenyu/protocol/mqtt/Connect.java   | 30 ++++++++++++++++++++--
 .../apache/shenyu/protocol/mqtt/MessageType.java   |  3 +++
 .../shenyu/protocol/mqtt/MqttTransportHandler.java |  2 --
 .../{MqttTransportHandler.java => PingReq.java}    | 25 +++---------------
 .../{MqttTransportHandler.java => PingResp.java}   | 30 ++++++++--------------
 .../org/apache/shenyu/protocol/mqtt/Subscribe.java | 11 ++++++--
 .../mqtt/repositories/SubscribeRepository.java     | 18 ++++++++++++-
 7 files changed, 71 insertions(+), 48 deletions(-)

diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Connect.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Connect.java
index 3f5174ed1..8a9fa005a 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Connect.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Connect.java
@@ -22,29 +22,46 @@ import io.netty.handler.codec.mqtt.MqttConnAckMessage;
 import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
 import io.netty.handler.codec.mqtt.MqttMessageBuilders;
+import io.netty.handler.codec.mqtt.MqttVersion;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.shenyu.common.utils.Singleton;
 import org.apache.shenyu.protocol.mqtt.repositories.ChannelRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
+import static 
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
+import static 
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
+import static 
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
 
 /**
  * Client requests a connection to a server.
  */
 public class Connect extends MessageType {
 
+    private static final Logger LOG = LoggerFactory.getLogger(Connect.class);
+
     @Override
     public void connect(final ChannelHandlerContext ctx, final 
MqttConnectMessage msg) {
 
         String clientId = msg.payload().clientIdentifier();
         if (StringUtils.isEmpty(clientId)) {
-            
ctx.writeAndFlush(wrong(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED));
+            LOG.info("MQTT clientId can not be empty.");
+            close(ctx, CONNECTION_REFUSED_IDENTIFIER_REJECTED);
             return;
         }
 
+        if (!allowedProtocolVersion(msg)) {
+            LOG.info("MQTT protocol version is not supported. clientId: {}", 
clientId);
+            close(ctx, CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
+        }
+
         String userName = msg.payload().userName();
         byte[] passwordInBytes = msg.payload().passwordInBytes();
 
         if (!MqttContext.isValid(userName, passwordInBytes)) {
-            
ctx.writeAndFlush(wrong(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
+            LOG.info("MQTT client verification failed, please check the 
username and password.");
+            close(ctx, CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
             return;
         }
 
@@ -58,10 +75,19 @@ public class Connect extends MessageType {
         setConnected(true);
     }
 
+    private void close(final ChannelHandlerContext ctx, final 
MqttConnectReturnCode returnCode) {
+        ctx.writeAndFlush(wrong(returnCode));
+        ctx.close().addListener(CLOSE_ON_FAILURE);
+    }
+
     private MqttConnAckMessage wrong(final MqttConnectReturnCode returnCode) {
         return MqttMessageBuilders.connAck()
                 .returnCode(returnCode)
                 .sessionPresent(false)
                 .build();
     }
+
+    private boolean allowedProtocolVersion(final MqttConnectMessage msg) {
+        return msg.variableHeader().version() == 
MqttVersion.MQTT_3_1.protocolLevel();
+    }
 }
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MessageType.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MessageType.java
index 4cc096aaf..9d766f31b 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MessageType.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MessageType.java
@@ -67,16 +67,19 @@ public class MessageType implements AbstractMessageType {
     @Override
     public void unsubscribe(final ChannelHandlerContext ctx, final 
MqttUnsubscribeMessage msg) {
         //// todo polymorphism unsubscribe
+        new Unsubscribe().unsubscribe(ctx, msg);
     }
 
     @Override
     public void pingReq(final ChannelHandlerContext ctx) {
         //// todo polymorphism pingReq
+        new PingReq().pingReq(ctx);
     }
 
     @Override
     public void pingResp(final ChannelHandlerContext ctx) {
         //// todo polymorphism pingResp
+        new PingResp().pingResp(ctx);
     }
 
     @Override
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
index 2c4677a08..49ddc8a05 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
@@ -36,8 +36,6 @@ public class MqttTransportHandler extends 
ChannelInboundHandlerAdapter implement
         } else {
             ctx.close();
         }
-
-        super.channelRead(ctx, msg);
     }
 
     @Override
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingReq.java
similarity index 53%
copy from 
shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
copy to 
shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingReq.java
index 2c4677a08..716cf5491 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingReq.java
@@ -18,31 +18,14 @@
 package org.apache.shenyu.protocol.mqtt;
 
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 
 /**
- * mqtt transport handler.
+ * Client sends pingreq to the server.
  */
-public class MqttTransportHandler extends ChannelInboundHandlerAdapter 
implements GenericFutureListener<Future<? super Void>> {
+public class PingReq extends MessageType {
 
     @Override
-    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception {
-        if (msg instanceof MqttMessage) {
-            MqttFactory mqttFactory = new MqttFactory((MqttMessage) msg, ctx);
-            mqttFactory.connect();
-        } else {
-            ctx.close();
-        }
-
-        super.channelRead(ctx, msg);
-    }
-
-    @Override
-    public void operationComplete(final Future<? super Void> future) throws 
Exception {
-
+    public void pingReq(final ChannelHandlerContext ctx) {
+        new PingResp().pingResp(ctx);
     }
-
 }
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingResp.java
similarity index 56%
copy from 
shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
copy to 
shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingResp.java
index 2c4677a08..ac258697a 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingResp.java
@@ -18,31 +18,21 @@
 package org.apache.shenyu.protocol.mqtt;
 
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
 
 /**
- * mqtt transport handler.
+ * Mqtt Server send pingresp to respond to pingreq of the client.
  */
-public class MqttTransportHandler extends ChannelInboundHandlerAdapter 
implements GenericFutureListener<Future<? super Void>> {
+public class PingResp extends MessageType {
 
     @Override
-    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
throws Exception {
-        if (msg instanceof MqttMessage) {
-            MqttFactory mqttFactory = new MqttFactory((MqttMessage) msg, ctx);
-            mqttFactory.connect();
-        } else {
-            ctx.close();
-        }
-
-        super.channelRead(ctx, msg);
-    }
-
-    @Override
-    public void operationComplete(final Future<? super Void> future) throws 
Exception {
-
+    public void pingResp(final ChannelHandlerContext ctx) {
+        MqttFixedHeader pingreqFixedHeader = new 
MqttFixedHeader(MqttMessageType.PINGRESP, false,
+                MqttQoS.AT_MOST_ONCE, false, 0);
+        MqttMessage pingResp = new MqttMessage(pingreqFixedHeader);
+        ctx.writeAndFlush(pingResp);
     }
-
 }
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
index bfec42788..9e8d184d0 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
@@ -30,7 +30,9 @@ import io.netty.handler.codec.mqtt.MqttSubAckMessage;
 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.util.CharsetUtil;
+import org.apache.logging.log4j.util.Strings;
 import org.apache.shenyu.common.utils.Singleton;
+import org.apache.shenyu.protocol.mqtt.repositories.SubscribeRepository;
 import org.apache.shenyu.protocol.mqtt.repositories.TopicRepository;
 
 import java.util.ArrayList;
@@ -57,16 +59,21 @@ public class Subscribe extends MessageType {
         }
         List<MqttTopicSubscription> mqttTopicSubscriptions = 
msg.payload().topicSubscriptions();
         int packetId = msg.variableHeader().messageId();
+
         //// todo Regular match
         List<String> ackTopics = mqttTopicSubscriptions
                 .stream()
                 .filter(topicSub -> topicSub.qualityOfService() != FAILURE)
                 .map(MqttTopicSubscription::topicName)
                 .collect(Collectors.toList());
+
+        Singleton.INST.get(SubscribeRepository.class).add(ctx.channel(), 
mqttTopicSubscriptions);
+
         for (String ackTopic : ackTopics) {
             String message = 
Singleton.INST.get(TopicRepository.class).get(ackTopic);
-            sendSubMessage(ackTopic, message, packetId, channel);
-
+            if (Strings.isNotEmpty(message)) {
+                sendSubMessage(ackTopic, message, packetId, channel);
+            }
         }
 
         sendSubAckMessage(packetId, ackTopics, channel);
diff --git 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
index bf987bcfd..39e82c440 100644
--- 
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
+++ 
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
@@ -18,6 +18,9 @@
 package org.apache.shenyu.protocol.mqtt.repositories;
 
 import io.netty.channel.Channel;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
@@ -32,17 +35,30 @@ import java.util.concurrent.CopyOnWriteArraySet;
  */
 public class SubscribeRepository implements BaseRepository<List<String>, 
List<Channel>> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(SubscribeRepository.class);
+
     private static final Map<String, List<Channel>> TOPIC_CHANNEL_FACTORY = 
new ConcurrentHashMap<>();
 
     @Override
     public void add(final List<String> topics, final List<Channel> channels) {
-
         CompletableFuture.runAsync(() -> topics.parallelStream().forEach(s -> {
             List<Channel> list = get(s);
             list.addAll(channels);
             TOPIC_CHANNEL_FACTORY.put(s, list);
         }));
+    }
 
+    /**
+     * add subscribe channel.
+     * @param channel channel
+     * @param mqttTopicSubscription mqtt subscription info
+     */
+    public void add(final Channel channel, final List<MqttTopicSubscription> 
mqttTopicSubscription) {
+        CompletableFuture.runAsync(() -> 
mqttTopicSubscription.parallelStream().forEach(s -> {
+            List<Channel> channels = get(s.topicName());
+            channels.add(channel);
+            TOPIC_CHANNEL_FACTORY.put(s.topicName(), channels);
+        }));
     }
 
     @Override

Reply via email to