This is an automated email from the ASF dual-hosted git repository.
impactcn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 0b69199e7 [ISSUE #3118] feature protocol-mqtt: enhanced mqtt plugin
(#3377)
0b69199e7 is described below
commit 0b69199e7ec070c0da47edb6e9951a4efddcb0d8
Author: 拉姆徐 <[email protected]>
AuthorDate: Fri May 6 14:12:53 2022 +0800
[ISSUE #3118] feature protocol-mqtt: enhanced mqtt plugin (#3377)
* feature protocol-mqtt: handle pingReq message
* feature protocol-mqtt: handle unSubscribe message
---
.../org/apache/shenyu/protocol/mqtt/Connect.java | 30 ++++++++++++++++++++--
.../apache/shenyu/protocol/mqtt/MessageType.java | 3 +++
.../shenyu/protocol/mqtt/MqttTransportHandler.java | 2 --
.../{MqttTransportHandler.java => PingReq.java} | 25 +++---------------
.../{MqttTransportHandler.java => PingResp.java} | 30 ++++++++--------------
.../org/apache/shenyu/protocol/mqtt/Subscribe.java | 11 ++++++--
.../mqtt/repositories/SubscribeRepository.java | 18 ++++++++++++-
7 files changed, 71 insertions(+), 48 deletions(-)
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Connect.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Connect.java
index 3f5174ed1..8a9fa005a 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Connect.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Connect.java
@@ -22,29 +22,46 @@ import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
+import io.netty.handler.codec.mqtt.MqttVersion;
import org.apache.commons.lang3.StringUtils;
import org.apache.shenyu.common.utils.Singleton;
import org.apache.shenyu.protocol.mqtt.repositories.ChannelRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
+import static
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
+import static
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
+import static
io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
/**
* Client requests a connection to a server.
*/
public class Connect extends MessageType {
+ private static final Logger LOG = LoggerFactory.getLogger(Connect.class);
+
@Override
public void connect(final ChannelHandlerContext ctx, final
MqttConnectMessage msg) {
String clientId = msg.payload().clientIdentifier();
if (StringUtils.isEmpty(clientId)) {
-
ctx.writeAndFlush(wrong(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED));
+ LOG.info("MQTT clientId can not be empty.");
+ close(ctx, CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return;
}
+ if (!allowedProtocolVersion(msg)) {
+ LOG.info("MQTT protocol version is not supported. clientId: {}",
clientId);
+ close(ctx, CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
+ }
+
String userName = msg.payload().userName();
byte[] passwordInBytes = msg.payload().passwordInBytes();
if (!MqttContext.isValid(userName, passwordInBytes)) {
-
ctx.writeAndFlush(wrong(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD));
+ LOG.info("MQTT client verification failed, please check the
username and password.");
+ close(ctx, CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
return;
}
@@ -58,10 +75,19 @@ public class Connect extends MessageType {
setConnected(true);
}
+ private void close(final ChannelHandlerContext ctx, final
MqttConnectReturnCode returnCode) {
+ ctx.writeAndFlush(wrong(returnCode));
+ ctx.close().addListener(CLOSE_ON_FAILURE);
+ }
+
private MqttConnAckMessage wrong(final MqttConnectReturnCode returnCode) {
return MqttMessageBuilders.connAck()
.returnCode(returnCode)
.sessionPresent(false)
.build();
}
+
+ private boolean allowedProtocolVersion(final MqttConnectMessage msg) {
+ return msg.variableHeader().version() ==
MqttVersion.MQTT_3_1.protocolLevel();
+ }
}
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MessageType.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MessageType.java
index 4cc096aaf..9d766f31b 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MessageType.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MessageType.java
@@ -67,16 +67,19 @@ public class MessageType implements AbstractMessageType {
@Override
public void unsubscribe(final ChannelHandlerContext ctx, final
MqttUnsubscribeMessage msg) {
//// todo polymorphism unsubscribe
+ new Unsubscribe().unsubscribe(ctx, msg);
}
@Override
public void pingReq(final ChannelHandlerContext ctx) {
//// todo polymorphism pingReq
+ new PingReq().pingReq(ctx);
}
@Override
public void pingResp(final ChannelHandlerContext ctx) {
//// todo polymorphism pingResp
+ new PingResp().pingResp(ctx);
}
@Override
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
index 2c4677a08..49ddc8a05 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
@@ -36,8 +36,6 @@ public class MqttTransportHandler extends
ChannelInboundHandlerAdapter implement
} else {
ctx.close();
}
-
- super.channelRead(ctx, msg);
}
@Override
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingReq.java
similarity index 53%
copy from
shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
copy to
shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingReq.java
index 2c4677a08..716cf5491 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingReq.java
@@ -18,31 +18,14 @@
package org.apache.shenyu.protocol.mqtt;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
/**
- * mqtt transport handler.
+ * Client sends pingreq to the server.
*/
-public class MqttTransportHandler extends ChannelInboundHandlerAdapter
implements GenericFutureListener<Future<? super Void>> {
+public class PingReq extends MessageType {
@Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg)
throws Exception {
- if (msg instanceof MqttMessage) {
- MqttFactory mqttFactory = new MqttFactory((MqttMessage) msg, ctx);
- mqttFactory.connect();
- } else {
- ctx.close();
- }
-
- super.channelRead(ctx, msg);
- }
-
- @Override
- public void operationComplete(final Future<? super Void> future) throws
Exception {
-
+ public void pingReq(final ChannelHandlerContext ctx) {
+ new PingResp().pingResp(ctx);
}
-
}
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingResp.java
similarity index 56%
copy from
shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
copy to
shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingResp.java
index 2c4677a08..ac258697a 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/MqttTransportHandler.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/PingResp.java
@@ -18,31 +18,21 @@
package org.apache.shenyu.protocol.mqtt;
import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.handler.codec.mqtt.MqttMessageType;
+import io.netty.handler.codec.mqtt.MqttQoS;
/**
- * mqtt transport handler.
+ * Mqtt Server send pingresp to respond to pingreq of the client.
*/
-public class MqttTransportHandler extends ChannelInboundHandlerAdapter
implements GenericFutureListener<Future<? super Void>> {
+public class PingResp extends MessageType {
@Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg)
throws Exception {
- if (msg instanceof MqttMessage) {
- MqttFactory mqttFactory = new MqttFactory((MqttMessage) msg, ctx);
- mqttFactory.connect();
- } else {
- ctx.close();
- }
-
- super.channelRead(ctx, msg);
- }
-
- @Override
- public void operationComplete(final Future<? super Void> future) throws
Exception {
-
+ public void pingResp(final ChannelHandlerContext ctx) {
+ MqttFixedHeader pingreqFixedHeader = new
MqttFixedHeader(MqttMessageType.PINGRESP, false,
+ MqttQoS.AT_MOST_ONCE, false, 0);
+ MqttMessage pingResp = new MqttMessage(pingreqFixedHeader);
+ ctx.writeAndFlush(pingResp);
}
-
}
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
index bfec42788..9e8d184d0 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/Subscribe.java
@@ -30,7 +30,9 @@ import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.util.CharsetUtil;
+import org.apache.logging.log4j.util.Strings;
import org.apache.shenyu.common.utils.Singleton;
+import org.apache.shenyu.protocol.mqtt.repositories.SubscribeRepository;
import org.apache.shenyu.protocol.mqtt.repositories.TopicRepository;
import java.util.ArrayList;
@@ -57,16 +59,21 @@ public class Subscribe extends MessageType {
}
List<MqttTopicSubscription> mqttTopicSubscriptions =
msg.payload().topicSubscriptions();
int packetId = msg.variableHeader().messageId();
+
//// todo Regular match
List<String> ackTopics = mqttTopicSubscriptions
.stream()
.filter(topicSub -> topicSub.qualityOfService() != FAILURE)
.map(MqttTopicSubscription::topicName)
.collect(Collectors.toList());
+
+ Singleton.INST.get(SubscribeRepository.class).add(ctx.channel(),
mqttTopicSubscriptions);
+
for (String ackTopic : ackTopics) {
String message =
Singleton.INST.get(TopicRepository.class).get(ackTopic);
- sendSubMessage(ackTopic, message, packetId, channel);
-
+ if (Strings.isNotEmpty(message)) {
+ sendSubMessage(ackTopic, message, packetId, channel);
+ }
}
sendSubAckMessage(packetId, ackTopics, channel);
diff --git
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
index bf987bcfd..39e82c440 100644
---
a/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
+++
b/shenyu-protocol/shenyu-protocol-mqtt/src/main/java/org/apache/shenyu/protocol/mqtt/repositories/SubscribeRepository.java
@@ -18,6 +18,9 @@
package org.apache.shenyu.protocol.mqtt.repositories;
import io.netty.channel.Channel;
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
@@ -32,17 +35,30 @@ import java.util.concurrent.CopyOnWriteArraySet;
*/
public class SubscribeRepository implements BaseRepository<List<String>,
List<Channel>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SubscribeRepository.class);
+
private static final Map<String, List<Channel>> TOPIC_CHANNEL_FACTORY =
new ConcurrentHashMap<>();
@Override
public void add(final List<String> topics, final List<Channel> channels) {
-
CompletableFuture.runAsync(() -> topics.parallelStream().forEach(s -> {
List<Channel> list = get(s);
list.addAll(channels);
TOPIC_CHANNEL_FACTORY.put(s, list);
}));
+ }
+ /**
+ * add subscribe channel.
+ * @param channel channel
+ * @param mqttTopicSubscription mqtt subscription info
+ */
+ public void add(final Channel channel, final List<MqttTopicSubscription>
mqttTopicSubscription) {
+ CompletableFuture.runAsync(() ->
mqttTopicSubscription.parallelStream().forEach(s -> {
+ List<Channel> channels = get(s.topicName());
+ channels.add(channel);
+ TOPIC_CHANNEL_FACTORY.put(s.topicName(), channels);
+ }));
}
@Override