This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch 2.x in repository https://gitbox.apache.org/repos/asf/incubator-seata.git
The following commit(s) were added to refs/heads/2.x by this push: new b2ff764329 optimize: select channel handles based on protocol versions (#6634) b2ff764329 is described below commit b2ff764329571ed699ed1ee0405616faff7202ff Author: funkye <jian...@apache.org> AuthorDate: Mon Jul 1 15:34:43 2024 +0800 optimize: select channel handles based on protocol versions (#6634) --- changes/en-us/2.x.md | 5 +- changes/zh-cn/2.x.md | 4 ++ .../rpc/netty/AbstractNettyRemotingClient.java | 6 +- .../rpc/netty/AbstractNettyRemotingServer.java | 29 ++------ .../core/rpc/netty/CompatibleProtocolEncoder.java | 79 ---------------------- ...tocolDecoder.java => MultiProtocolDecoder.java} | 46 +++++++++---- .../seata/core/rpc/netty/NettyClientBootstrap.java | 10 +-- .../seata/core/rpc/netty/NettyServerBootstrap.java | 11 ++- .../seata/core/rpc/netty/ProtocolDecoder.java | 3 +- .../seata/core/rpc/netty/v0/ProtocolDecoderV0.java | 41 +++++++++-- .../seata/core/rpc/netty/v0/ProtocolEncoderV0.java | 23 +++++-- .../seata/core/rpc/netty/v1/ProtocolDecoderV1.java | 45 ++++++++++-- .../seata/core/rpc/netty/v1/ProtocolEncoderV1.java | 20 +++++- .../core/rpc/netty/v1/ClientChannelHandler.java | 4 +- .../seata/core/rpc/netty/v1/ProtocolV1Client.java | 13 ++-- .../rpc/netty/v1/ProtocolV1SerializerTest.java | 7 +- .../seata/core/rpc/netty/v1/ProtocolV1Server.java | 59 +++++++--------- .../core/rpc/netty/v1/ServerChannelHandler.java | 6 +- 18 files changed, 213 insertions(+), 198 deletions(-) diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 54c2a2c6eb..7b6b41522b 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -3,7 +3,7 @@ Add changes here for all PR submitted to the 2.x branch. <!-- Please add the `changes` to the following location(feature/bugfix/optimize/test) based on the type of PR --> ### feature: - +- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support ### bugfix: - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager @@ -16,8 +16,10 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the task thread pool for committing and rollbacking statuses - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : load SeataSerializer by version - [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate RpcMessage and Encoder/Decoder dependencies +- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] select channel handles based on protocol versions - [[#6523](https://github.com/apache/incubator-seata/pull/6523)] upgrade alibaba/druid version to 1.2.20 + ### refactor: - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: send async response @@ -36,6 +38,7 @@ Thanks to these contributors for their code commits. Please report an unintended - [liuqiufeng](https://github.com/liuqiufeng) - [God-Gan](https://github.com/God-Gan) - [Bughue](https://github.com/Bughue) +- [funky-eyes](https://github.com/funky-eyes) - [tanyaofei](https://github.com/tanyaofei) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 3f59e14835..686908c99c 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -3,6 +3,7 @@ <!-- 请根据PR的类型添加 `变更记录` 到以下对应位置(feature/bugfix/optimize/test) 下 --> ### feature: +- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 支持seata私有协议多版本兼容 ### bugfix: - [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async注解ClusterWatcherManager中不生效的问题 @@ -15,8 +16,10 @@ - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 和 rollbacking 状态的任务线程池 - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化 - [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 和 Encoder/Decoder 的互相依赖 +- [[#6634](https://github.com/apache/incubator-seata/pull/6634)] 根据协议版本指定channel handle - [[#6523](https://github.com/apache/incubator-seata/pull/6523)] 升级 alibaba/druid 的版本到1.2.20 + ### refactor: - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应 @@ -34,6 +37,7 @@ - [liuqiufeng](https://github.com/liuqiufeng) - [God-Gan](https://github.com/God-Gan) - [Bughue](https://github.com/Bughue) +- [funky-eyes](https://github.com/funky-eyes) - [tanyaofei](https://github.com/tanyaofei) diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 2901eb8d3f..248e8f48f6 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -409,10 +409,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - RpcMessage rpcMessage = null; - if (msg instanceof ProtocolRpcMessage) { - rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); - processMessage(ctx, rpcMessage); + if (msg instanceof RpcMessage) { + processMessage(ctx, (RpcMessage)msg); } else { LOGGER.error("rpcMessage type error"); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 9be9e79c3b..4b79f20d95 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -69,7 +69,7 @@ public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting if (channel == null) { throw new RuntimeException("rm client is not connected. dbkey:" + resourceId + ",clientId:" + clientId); } - RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } @@ -78,7 +78,7 @@ public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting if (channel == null) { throw new RuntimeException("client is not connected"); } - RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout()); } @@ -87,7 +87,7 @@ public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting if (channel == null) { throw new RuntimeException("client is not connected"); } - RpcMessage rpcMessage = buildRequestMessage(channel, msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); + RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); super.sendAsync(channel, rpcMessage); } @@ -98,7 +98,7 @@ public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting clientChannel = ChannelManager.getSameClientChannel(channel); } if (clientChannel != null) { - RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg, msg instanceof HeartbeatMessage + RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE : ProtocolConstants.MSGTYPE_RESPONSE); super.sendAsync(clientChannel, rpcMsg); @@ -108,21 +108,6 @@ public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting } - private RpcMessage buildResponseMessage(Channel channel, RpcMessage fromRpcMessage, Object msg, byte messageType) { - RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage, msg, messageType); - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); - rpcMessage.setOtherSideVersion(rpcContext.getVersion()); - return rpcMessage; - } - - protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte messageType) { - RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType); - RpcContext rpcContext = ChannelManager.getContextFromIdentified(channel); - rpcMessage.setOtherSideVersion(rpcContext.getVersion()); - return rpcMessage; - } - - @Override public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) { Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor); @@ -179,10 +164,8 @@ public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting */ @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - RpcMessage rpcMessage = null; - if (msg instanceof ProtocolRpcMessage) { - rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); - processMessage(ctx, rpcMessage); + if (msg instanceof RpcMessage) { + processMessage(ctx, (RpcMessage)msg); } else { LOGGER.error("rpcMessage type error"); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java deleted file mode 100644 index e588b92b8e..0000000000 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.seata.core.rpc.netty; - -import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; -import org.apache.seata.common.util.StringUtils; -import org.apache.seata.core.protocol.ProtocolConstants; -import org.apache.seata.core.protocol.RpcMessage; -import org.apache.seata.core.protocol.Version; -import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0; -import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -/** - * Compatible Protocol Encoder - * <p> - * <li>Full Length: include all data </li> - * <li>Head Length: include head data from magic code to head map. </li> - * <li>Body Length: Full Length - Head Length</li> - * </p> - */ -public class CompatibleProtocolEncoder extends MessageToByteEncoder { - - private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolEncoder.class); - - private static Map<Byte, ProtocolEncoder> protocolEncoderMap; - - public CompatibleProtocolEncoder() { - super(); - protocolEncoderMap = ImmutableMap.<Byte, ProtocolEncoder>builder() - .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()) - .build(); - } - - @Override - public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) { - try { - if (msg instanceof RpcMessage) { - RpcMessage rpcMessage = (RpcMessage) msg; - String sdkVersion = rpcMessage.getOtherSideVersion(); - if (StringUtils.isBlank(sdkVersion)) { - sdkVersion = Version.getCurrent(); - } - byte protocolVersion = Version.calcProtocolVersion(sdkVersion); - ProtocolEncoder encoder = protocolEncoderMap.get(protocolVersion); - if (encoder == null) { - throw new UnsupportedOperationException("Unsupported protocolVersion: " + protocolVersion); - } - - encoder.encode(rpcMessage, out); - } else { - throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); - } - } catch (Throwable e) { - LOGGER.error("Encode request error!", e); - } - } -} diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java similarity index 75% rename from core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java rename to core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java index d066984c23..9bd9550369 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/MultiProtocolDecoder.java @@ -18,12 +18,15 @@ package org.apache.seata.core.rpc.netty; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0; +import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0; import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; +import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,17 +55,26 @@ import java.util.Map; * <li>Body Length: Full Length - Head Length</li> * </p> */ -public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder { +public class MultiProtocolDecoder extends LengthFieldBasedFrameDecoder { - private static final Logger LOGGER = LoggerFactory.getLogger(CompatibleProtocolDecoder.class); - private static Map<Byte, ProtocolDecoder> protocolDecoderMap; + private static final Logger LOGGER = LoggerFactory.getLogger(MultiProtocolDecoder.class); + private final Map<Byte, ProtocolDecoder> protocolDecoderMap; - public CompatibleProtocolDecoder() { + private final Map<Byte, ProtocolEncoder> protocolEncoderMap; + + private final ChannelHandler[] channelHandlers; + + public MultiProtocolDecoder(ChannelHandler... channelHandlers) { // default is 8M - this(ProtocolConstants.MAX_FRAME_LENGTH); + this(ProtocolConstants.MAX_FRAME_LENGTH, channelHandlers); } - public CompatibleProtocolDecoder(int maxFrameLength) { + public MultiProtocolDecoder() { + // default is 8M + this(ProtocolConstants.MAX_FRAME_LENGTH, null); + } + + public MultiProtocolDecoder(int maxFrameLength, ChannelHandler[] channelHandlers) { /* int maxFrameLength, int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 @@ -71,10 +83,13 @@ public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder { int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 */ super(maxFrameLength, 3, 4, -7, 0); - protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder() - .put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) - .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()) - .build(); + this.protocolDecoderMap = + ImmutableMap.<Byte, ProtocolDecoder>builder().put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0()) + .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1()).build(); + this.protocolEncoderMap = + ImmutableMap.<Byte, ProtocolEncoder>builder().put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0()) + .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1()).build(); + this.channelHandlers = channelHandlers; } @Override @@ -93,9 +108,10 @@ public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder { if (decoded instanceof ByteBuf) { frame = (ByteBuf) decoded; + ProtocolDecoder decoder = protocolDecoderMap.get(version); + ProtocolEncoder encoder = protocolEncoderMap.get(version); try { - ProtocolDecoder decoder = protocolDecoderMap.get(version); - if (decoder == null) { + if (decoder == null || encoder == null) { throw new UnsupportedOperationException("Unsupported version: " + version); } return decoder.decodeFrame(frame); @@ -103,6 +119,12 @@ public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder { if (version != ProtocolConstants.VERSION_0) { frame.release(); } + ctx.pipeline().addLast((ChannelHandler)decoder); + ctx.pipeline().addLast((ChannelHandler)encoder); + if (channelHandlers != null) { + ctx.pipeline().addLast(channelHandlers); + } + ctx.pipeline().remove(this); } } } catch (Exception exx) { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java index 4867f86bcf..4aaafc0acb 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java @@ -35,6 +35,8 @@ import io.netty.util.internal.PlatformDependent; import org.apache.seata.common.exception.FrameworkException; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.core.rpc.RemotingBootstrap; +import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1; +import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,12 +130,12 @@ public class NettyClientBootstrap implements RemotingBootstrap { @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast( - new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), + pipeline + .addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(), nettyClientConfig.getChannelMaxWriteIdleSeconds(), nettyClientConfig.getChannelMaxAllIdleSeconds())) - .addLast(new CompatibleProtocolDecoder()) - .addLast(new CompatibleProtocolEncoder()); + .addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); if (channelHandlers != null) { addChannelPipelineLast(ch, channelHandlers); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java index b847b2a96d..c7b2aa57c2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java @@ -158,13 +158,10 @@ public class NettyServerBootstrap implements RemotingBootstrap { .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { - ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) - .addLast(new CompatibleProtocolDecoder()) - .addLast(new CompatibleProtocolEncoder()); - if (channelHandlers != null) { - addChannelPipelineLast(ch, channelHandlers); - } - + MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(channelHandlers); + ch.pipeline() + .addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0)) + .addLast(multiProtocolDecoder); } }); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java index 42a7c75c04..d28506fd84 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java @@ -17,6 +17,7 @@ package org.apache.seata.core.rpc.netty; import io.netty.buffer.ByteBuf; +import org.apache.seata.core.protocol.RpcMessage; /** * the protocol decoder @@ -24,6 +25,6 @@ import io.netty.buffer.ByteBuf; **/ public interface ProtocolDecoder { - ProtocolRpcMessage decodeFrame(ByteBuf in); + RpcMessage decodeFrame(ByteBuf in); } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java index 42e112a2f6..d14ce91cea 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java @@ -17,9 +17,13 @@ package org.apache.seata.core.rpc.netty.v0; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.rpc.netty.ProtocolDecoder; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; @@ -53,13 +57,23 @@ import org.slf4j.LoggerFactory; * * @see ProtocolEncoderV0 */ -public class ProtocolDecoderV0 implements ProtocolDecoder { +public class ProtocolDecoderV0 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV0.class); + public ProtocolDecoderV0() { + /* + int maxFrameLength, + int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 + int lengthFieldLength, FullLength is int(4B). so values is 4 + int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 + int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 + */ + super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0); + } @Override - public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) { + public RpcMessage decodeFrame(ByteBuf in) { ProtocolRpcMessageV0 rpcMessage = new ProtocolRpcMessageV0(); if (in.readableBytes() < ProtocolConstantsV0.HEAD_LENGTH) { throw new IllegalArgumentException("Nothing to decode."); @@ -93,7 +107,7 @@ public class ProtocolDecoderV0 implements ProtocolDecoder { rpcMessage.setBody(HeartbeatMessage.PONG); } - return rpcMessage; + return rpcMessage.protocolMsg2RpcMsg(); } if (bodyLength > 0 && in.readableBytes() < bodyLength) { @@ -125,8 +139,27 @@ public class ProtocolDecoderV0 implements ProtocolDecoder { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Receive:" + rpcMessage.getBody() + ", messageId:" + msgId); } - return rpcMessage; + return rpcMessage.protocolMsg2RpcMsg(); } + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + Object decoded; + try { + decoded = super.decode(ctx, in); + if (decoded instanceof ByteBuf) { + ByteBuf frame = (ByteBuf)decoded; + try { + return decodeFrame(frame); + } finally { + frame.release(); + } + } + } catch (Exception exx) { + LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java index 3fc447b281..f217a84329 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java @@ -17,6 +17,8 @@ package org.apache.seata.core.rpc.netty.v0; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.MessageTypeAware; import org.apache.seata.core.protocol.ProtocolConstants; @@ -54,7 +56,7 @@ import org.slf4j.LoggerFactory; * * @see ProtocolDecoderV0 */ -public class ProtocolEncoderV0 implements ProtocolEncoder { +public class ProtocolEncoderV0 extends MessageToByteEncoder implements ProtocolEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolEncoderV0.class); @@ -67,9 +69,9 @@ public class ProtocolEncoderV0 implements ProtocolEncoder { out.writeShort(ProtocolConstantsV0.MAGIC); int flag = (msg.isAsync() ? ProtocolConstantsV0.FLAG_ASYNC : 0) - | (msg.isHeartbeat() ? ProtocolConstantsV0.FLAG_HEARTBEAT : 0) - | (msg.isRequest() ? ProtocolConstantsV0.FLAG_REQUEST : 0) - | (msg.isSeataCodec() ? ProtocolConstantsV0.FLAG_SEATA_CODEC : 0); + | (msg.isHeartbeat() ? ProtocolConstantsV0.FLAG_HEARTBEAT : 0) + | (msg.isRequest() ? ProtocolConstantsV0.FLAG_REQUEST : 0) + | (msg.isSeataCodec() ? ProtocolConstantsV0.FLAG_SEATA_CODEC : 0); out.writeShort((short) flag); @@ -103,4 +105,17 @@ public class ProtocolEncoderV0 implements ProtocolEncoder { LOGGER.error("Encode request error!", e); } } + + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + try { + if (msg instanceof RpcMessage) { + encode((RpcMessage)msg, out); + } else { + throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); + } + } catch (Throwable e) { + LOGGER.error("Encode request error!", e); + } + } } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java index 9ca4977944..ce48b3ce8c 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java @@ -20,12 +20,15 @@ import java.util.List; import java.util.Map; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.seata.core.compressor.Compressor; import org.apache.seata.core.compressor.CompressorFactory; +import org.apache.seata.core.exception.DecodeException; import org.apache.seata.core.protocol.HeartbeatMessage; import org.apache.seata.core.protocol.ProtocolConstants; +import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.rpc.netty.ProtocolDecoder; -import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.serializer.SerializerServiceLoader; import org.apache.seata.core.serializer.SerializerType; @@ -61,23 +64,32 @@ import org.slf4j.LoggerFactory; * @see ProtocolEncoderV1 * @since 0.7.0 */ -public class ProtocolDecoderV1 implements ProtocolDecoder { +public class ProtocolDecoderV1 extends LengthFieldBasedFrameDecoder implements ProtocolDecoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDecoderV1.class); private final List<SerializerType> supportDeSerializerTypes; public ProtocolDecoderV1() { + /* + int maxFrameLength, + int lengthFieldOffset, magic code is 2B, and version is 1B, and then FullLength. so value is 3 + int lengthFieldLength, FullLength is int(4B). so values is 4 + int lengthAdjustment, FullLength include all data and read 7 bytes before, so the left length is (FullLength-7). so values is -7 + int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0 + */ + super(ProtocolConstants.MAX_FRAME_LENGTH, 3, 4, -7, 0); supportDeSerializerTypes = SerializerServiceLoader.getSupportedSerializers(); if (supportDeSerializerTypes.isEmpty()) { throw new IllegalArgumentException("No serializer found"); - } } + } + } @Override - public ProtocolRpcMessage decodeFrame(ByteBuf frame) { + public RpcMessage decodeFrame(ByteBuf frame) { byte b0 = frame.readByte(); byte b1 = frame.readByte(); if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 - || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) { + || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) { throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1); } @@ -125,6 +137,27 @@ public class ProtocolDecoderV1 implements ProtocolDecoder { } } - return rpcMessage; + return rpcMessage.protocolMsg2RpcMsg(); } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + Object decoded; + try { + decoded = super.decode(ctx, in); + if (decoded instanceof ByteBuf) { + ByteBuf frame = (ByteBuf)decoded; + try { + return decodeFrame(frame); + } finally { + frame.release(); + } + } + } catch (Exception exx) { + LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } + } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java index 14cbcdb55d..dd01b948db 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java @@ -17,6 +17,8 @@ package org.apache.seata.core.rpc.netty.v1; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; import org.apache.seata.core.rpc.netty.ProtocolEncoder; import org.apache.seata.core.serializer.Serializer; import org.apache.seata.core.compressor.Compressor; @@ -57,7 +59,7 @@ import java.util.Map; * @see ProtocolDecoderV1 * @since 0.7.0 */ -public class ProtocolEncoderV1 implements ProtocolEncoder { +public class ProtocolEncoderV1 extends MessageToByteEncoder implements ProtocolEncoder { private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolEncoderV1.class); @@ -91,7 +93,7 @@ public class ProtocolEncoderV1 implements ProtocolEncoder { byte[] bodyBytes = null; if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST - && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + && messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { // heartbeat has no body Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), ProtocolConstants.VERSION_1); bodyBytes = serializer.serialize(rpcMessage.getBody()); @@ -119,4 +121,18 @@ public class ProtocolEncoderV1 implements ProtocolEncoder { throw e; } } + + @Override + protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { + try { + if (msg instanceof RpcMessage) { + this.encode((RpcMessage)msg, out); + } else { + throw new UnsupportedOperationException("Not support this class:" + msg.getClass()); + } + } catch (Throwable e) { + LOGGER.error("Encode request error!", e); + } + } + } diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java index e35c124e30..14709e5f06 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java @@ -50,8 +50,8 @@ public class ClientChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof ProtocolRpcMessage) { - RpcMessage rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); + if (msg instanceof RpcMessage) { + RpcMessage rpcMessage = (RpcMessage)msg; int msgId = rpcMessage.getId(); DefaultPromise future = (DefaultPromise) client.futureMap.remove(msgId); if (future != null) { diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java index 3f52ed63c5..a1fb25e1e3 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java @@ -45,8 +45,6 @@ import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.transaction.BranchCommitRequest; import org.apache.seata.core.serializer.SerializerType; -import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder; -import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,8 +80,9 @@ public class ProtocolV1Client { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast(new CompatibleProtocolEncoder()); - pipeline.addLast(new CompatibleProtocolDecoder(8 * 1024 * 1024)); + pipeline + .addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); pipeline.addLast(new ClientChannelHandler(ProtocolV1Client.this)); } }); @@ -95,13 +94,13 @@ public class ProtocolV1Client { } else { Throwable cause = channelFuture.cause(); throw new RuntimeException("Failed to connect " + host + ":" + port + - (cause != null ? ". Cause by: " + cause.getMessage() : ".")); + (cause != null ? ". Cause by: " + cause.getMessage() : ".")); } } private EventLoopGroup createWorkerGroup() { NamedThreadFactory threadName = - new NamedThreadFactory("CLI-WORKER", false); + new NamedThreadFactory("CLI-WORKER", false); return new NioEventLoopGroup(10, threadName); } @@ -158,7 +157,7 @@ public class ProtocolV1Client { final AtomicLong cnt = new AtomicLong(0); // no queue final ThreadPoolExecutor service1 = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, - new SynchronousQueue<Runnable>(), new NamedThreadFactory("client-", false)); + new SynchronousQueue<Runnable>(), new NamedThreadFactory("client-", false)); for (int i = 0; i < threads; i++) { service1.execute(() -> { while (true) { diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java index 5ee0df7dee..ac75db20c5 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java @@ -27,8 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.seata.common.thread.NamedThreadFactory; import org.apache.seata.core.model.BranchType; +import org.apache.seata.core.protocol.RpcMessage; import org.apache.seata.core.protocol.transaction.BranchCommitRequest; -import org.apache.seata.core.rpc.netty.ProtocolRpcMessage; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -80,7 +80,7 @@ public class ProtocolV1SerializerTest { while (tag.getAndIncrement() < runTimes) { try { Future future = client.sendRpc(head, body); - ProtocolRpcMessage resp = (ProtocolRpcMessage) future.get(10, TimeUnit.SECONDS); + RpcMessage resp = (RpcMessage)future.get(10, TimeUnit.SECONDS); if (resp != null) { success.incrementAndGet(); } @@ -93,9 +93,10 @@ public class ProtocolV1SerializerTest { }); } - cnt.await(); + cnt.await(10,TimeUnit.SECONDS); LOGGER.info("success {}/{}", success.get(), runTimes); Assertions.assertEquals(success.get(), runTimes); + service1.shutdown(); } catch (InterruptedException e) { LOGGER.error("Thread interrupted", e); } finally { diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java index 20a2fdbcc5..4c7565eedb 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java @@ -35,12 +35,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import org.apache.seata.common.thread.NamedThreadFactory; -import org.apache.seata.common.thread.NamedThreadFactory; -import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder; -import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder; - -import java.net.InetSocketAddress; -import java.util.concurrent.TimeUnit; +import org.apache.seata.core.rpc.netty.MultiProtocolDecoder; /** */ @@ -58,37 +53,31 @@ public class ProtocolV1Server { workerGroup = createWorkerGroup(); serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) - .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) - .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) - .childOption(ChannelOption.SO_KEEPALIVE, true) - .childOption(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.SO_RCVBUF, 8192 * 128) - .childOption(ChannelOption.SO_SNDBUF, 8192 * 128) - .handler(new LoggingHandler(LogLevel.DEBUG)) - .childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) - .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( - 8192, 31768)) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(Channel channel) throws Exception { - ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast(new CompatibleProtocolDecoder(8 * 1024 * 1024)); - pipeline.addLast(new CompatibleProtocolEncoder()); - pipeline.addLast(new ServerChannelHandler()); - } - }); + .channel(NioServerSocketChannel.class) + .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT) + .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_RCVBUF, 8192 * 128) + .childOption(ChannelOption.SO_SNDBUF, 8192 * 128) + .handler(new LoggingHandler(LogLevel.DEBUG)) + .childOption(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) + .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( + 8192, 31768)) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast(new MultiProtocolDecoder(new ServerChannelHandler())); + } + }); String host = "0.0.0.0"; ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(host, port)); - ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - throw new RuntimeException("Server start fail !", future.cause()); - } + ChannelFuture channelFuture = future.addListener((ChannelFutureListener)future1 -> { + if (!future1.isSuccess()) { + throw new RuntimeException("Server start fail !", future1.cause()); } }); @@ -111,13 +100,13 @@ public class ProtocolV1Server { private EventLoopGroup createBossGroup() { NamedThreadFactory threadName = - new NamedThreadFactory("SEV-BOSS-" + port, false); + new NamedThreadFactory("SEV-BOSS-" + port, false); return new NioEventLoopGroup(2, threadName); } private EventLoopGroup createWorkerGroup() { NamedThreadFactory threadName = - new NamedThreadFactory("SEV-WORKER-" + port, false); + new NamedThreadFactory("SEV-WORKER-" + port, false); return new NioEventLoopGroup(10, threadName); } diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java index 8b468d0e8f..9a8fff0582 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java @@ -39,10 +39,8 @@ public class ServerChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Channel channel = ctx.channel(); - - if (msg instanceof ProtocolRpcMessage) { - RpcMessage rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg(); - channel.writeAndFlush(rpcMessage); + if (msg instanceof RpcMessage) { + channel.writeAndFlush(msg); } else { LOGGER.error("rpcMessage type error"); } --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org