This is an automated email from the ASF dual-hosted git repository. jianbin pushed a commit to branch docusaurus in repository https://gitbox.apache.org/repos/asf/incubator-seata-website.git
The following commit(s) were added to refs/heads/docusaurus by this push: new 112ff24653 blog : rpc multi-protocol (#877) 112ff24653 is described below commit 112ff24653ca47f5fde1744d0ed2ba2de181e9a0 Author: justabug <bug...@users.noreply.github.com> AuthorDate: Tue Aug 20 17:15:19 2024 +0800 blog : rpc multi-protocol (#877) --- .../seata-rpc-multi-protocol01.md | 202 ++++++++++++++ .../seata-rpc-multi-protocol02.md | 300 +++++++++++++++++++++ .../img/blog/rpc_multi-protocol/00-netty-layer.png | Bin 0 -> 156184 bytes .../img/blog/rpc_multi-protocol/01-netty-class.jpg | Bin 0 -> 47459 bytes .../img/blog/rpc_multi-protocol/02-processor.jpg | Bin 0 -> 107400 bytes .../blog/rpc_multi-protocol/03-netty-handler.jpg | Bin 0 -> 57380 bytes static/img/blog/rpc_multi-protocol/04-protocol.jpg | Bin 0 -> 64571 bytes .../rpc_multi-protocol/05-encode-decode-class.jpg | Bin 0 -> 52913 bytes .../blog/rpc_multi-protocol/06-multi-protocol.png | Bin 0 -> 328324 bytes 9 files changed, 502 insertions(+) diff --git a/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md b/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md new file mode 100644 index 0000000000..1fc0c3c1a7 --- /dev/null +++ b/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol01.md @@ -0,0 +1,202 @@ +--- +title: Seata的RPC通信源码分析01:传输篇 +author: 谢明华 +keywords: [Seata、RPC模块、协议 ] +date: 2024/08/15 +--- +# Seata的RPC通信源码分析01:传输篇 + +## 引言和概述 + +在分布式系统中,通信协议的设计直接影响系统的可靠性和可扩展性。Apache Seata的RPC通信协议为各组件间的数据传输提供了基础,对这方面的源码分析是深入理解seata的又一个好方式。在最近的2.2.0版本里,我为Seata的通信机制进行了重构,以支持多版本协议的兼容性,现在改造完成了,我将从传输机制和通信协议两个方面去分析新版本里源码。 +本文是第一篇,介绍Seata传输机制。 + +seata里的RPC通信主角是`TC`、`TM`、`RM`三者,当然过程中还可能会涉及注册中心甚至配置中心等其他网络交互,但这些相对内容所使用的通信机制是相对独立的,本文不作讨论。 + +接下来我将按照我最早了解源码时的几个直觉提问,带大家进行探索。 + +## Seata中的Netty(谁在传输) +第一个问题:seata通信的底层是什么在进行发送请求报文和接收请求报文?答案是netty,而netty在seata里面是如何工作的呢?我们将去到core包的org.apache.seata.core.rpc.netty去探索 + +<img src="/img/blog/rpc_multi-protocol/01-netty-class.jpg" width="700px" /> + +从这个继承关系我们可以看到,`AbstractNettyRemoting`作为核心的父类,RM和TM和Server(TC)都实现了他,实际上这个类里面已经实现了核心的发送和接收 + +在`sendSync`实现了同步发送逻辑,异步发送`sendAsync`的逻辑相似且更简单这里不再重复,只要拿到channel进行发送即可 +```java +protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { + // 此处省略非关键代码 + + MessageFuture messageFuture = new MessageFuture(); + messageFuture.setRequestMessage(rpcMessage); + messageFuture.setTimeout(timeoutMillis); + futures.put(rpcMessage.getId(), messageFuture); + + channelWritableCheck(channel, rpcMessage.getBody()); + + String remoteAddr = ChannelUtil.getAddressFromChannel(channel); + doBeforeRpcHooks(remoteAddr, rpcMessage); + + // netty的写方法(netty write) + channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); + if (messageFuture1 != null) { + messageFuture1.setResultMessage(future.cause()); + } + destroyChannel(future.channel()); + } + }); + + try { + Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); + doAfterRpcHooks(remoteAddr, rpcMessage, result); + return result; + } catch (Exception exx) { + // 此处省略非关键代码 + } + } +``` +而接收报文的方式,主要在processMessage方法里,这个方法被`AbstractNettyRemotingClient.ClientHandler`和`AbstractNettyRemotingServer.ServerHandler`这两个类的channelRead调用,这两个内部类都是`ChannelDuplexHandler`的子类,他们各自注册在client和server的Bootstrap里(为什么注册到bootstrap就能进行接收操作?这个要移步netty的原理) + +<img src="/img/blog/rpc_multi-protocol/03-netty-handler.jpg" width="700px" /> + +收到信息后就会调进父类的`processMessage`方法里,我们来看看源码 +```java +protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + // 此处省略非关键代码 + Object body = rpcMessage.getBody(); + if (body instanceof MessageTypeAware) { + MessageTypeAware messageTypeAware = (MessageTypeAware) body; + final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); + if (pair != null) { + // 这里可读性稍微差点,first是Processor,表示普通处理,而second是线程池,表示池化处理。 + if (pair.getSecond() != null) { + try { + pair.getSecond().execute(() -> { + try { + pair.getFirst().process(ctx, rpcMessage); + } catch (Throwable th) { + LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); + } finally { + MDC.clear(); + } + }); + } catch (RejectedExecutionException e) { + // 此处省略非关键代码 + } + } else { + try { + pair.getFirst().process(ctx, rpcMessage); + } catch (Throwable th) { + LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); + } + } + } else { + LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); + } + } else { + LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); + } + } +``` +实际上这些processor和executor是client和server注册进来的处理器:下面是一部分的处理器,他们对应着不同的MessageType,以下是部分处理器的注册举例(他们在NettyRemotingServer#registerProcessor) +```java + super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor); + super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor); + super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor); + super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null); +``` +可以看到这些processor实际上就是seata各种提交回滚等等的处理器 + +## Seata中的NettyChannel(channel怎么管理) +那第二个问题,既然上面是netty依靠着channel在进行着收发,那这个channel怎么来呢?会一直持有吗?如果断了怎么重连?答案在`ChannelManager`和上面的两个reg的`processor`。 + +当RM/TM取得了server的地址,进行注册的时候(第一次通信),如果server能成功解析报文并发现是REG信息,就会进入`regRmProcessor`/`regTmProcessor`,这里以TM为例子 +```java +// server接收 RegTmProcessor + private void onRegTmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { + RegisterTMRequest message = (RegisterTMRequest) rpcMessage.getBody(); + String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + boolean isSuccess = false; + String errorInfo = StringUtils.EMPTY; + try { + if (null == checkAuthHandler || checkAuthHandler.regTransactionManagerCheckAuth(message)) { + // 在ChannelManager中注册channel,这里可以预见到注册之后,server进行sendSync(channel,xxx)的时候就可以拿到这个channel了 + ChannelManager.registerTMChannel(message, ctx.channel()); + Version.putChannelVersion(ctx.channel(), message.getVersion()); + isSuccess = true; + } + } catch (Exception exx) { + isSuccess = false; + errorInfo = exx.getMessage(); + LOGGER.error("TM register fail, error message:{}", errorInfo); + } + RegisterTMResponse response = new RegisterTMResponse(isSuccess); + // 异步回复 + remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response); + // 此处省略非关键代码 + } + +// ChannelManager + public static void registerTMChannel(RegisterTMRequest request, Channel channel) + throws IncompatibleVersionException { + RpcContext rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.TMROLE, request.getVersion(), + request.getApplicationId(), + request.getTransactionServiceGroup(), + null, channel); + rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS); + String clientIdentified = rpcContext.getApplicationId() + Constants.CLIENT_ID_SPLIT_CHAR + ChannelUtil.getClientIpFromChannel(channel); + ConcurrentMap<Integer, RpcContext> clientIdentifiedMap = CollectionUtils.computeIfAbsent(TM_CHANNELS, clientIdentified, key -> new ConcurrentHashMap<>()); + rpcContext.holdInClientChannels(clientIdentifiedMap); + } +``` +在ChannelManager里管理着`RM_CHANNELS`和`RM_CHANNELS`两个比较复杂的map,特别是RM_CHANNELS里面有4层(resourceId -> applicationId -> ip -> port -> RpcContext) + +说完了server对channel的管理,那client呢?这个map管理更简单一些,就是注册成功后在onRegisterMsgSuccess里面也用一个`NettyClientChannelManager`里registerChannel,后续跟server交互尽量都用这个channel。 + +那么第三个问题又来了,client的channel不可用了可以自行新建,可是server接收后发现这是新channel怎么办?或者server在异步回复的时候发现channel不可用了怎么办? +答案依然在`NettyClientChannelManager`,这里面相对复杂的是,client方面需要用到channel的时候,实际上由一个对象池`nettyClientKeyPool`管理着,这是个apache的objectPool,所以当channel不可用时也会由这个池子去新增并在使用完后入池。这个对象池实际上是一直持有着`RegisterTMRequest`,跟第一次进来时一样,每次创建需要创建channel的时候,实际上都发生了一次注册 +```java +// NettyClientChannelManager + public Channel makeObject(NettyPoolKey key) { + InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress()); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("NettyPool create channel to " + key); + } + Channel tmpChannel = clientBootstrap.getNewChannel(address); + Object response; + Channel channelToServer = null; + // key持有RegisterTMRequest + if (key.getMessage() == null) { + throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name()); + } + try { + // 实际上是在进行reg操作 + response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage()); + if (!isRegisterSuccess(response, key.getTransactionRole())) { + rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage()); + } else { + channelToServer = tmpChannel; + // 成功后会入池 + rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage()); + } + } + // 此处省略非关键代码 + + return channelToServer; + } +``` + +## 最后 +这一篇我们了解了seata是怎样借助netty来传输数据的,为了更好的看懂netty处理的全貌,我画了个层级图 + +<img src="/img/blog/rpc_multi-protocol/00-netty-layer.png" width="700px" /> + +上面已经讲了请求发送时,serverHandler/clientHandler和NettyRemoting(包括RM、TM、TC)的处理,知道了从外部到netty处理器再到内部的DefaultCoodinator的过程,但我们还缺Decoder/Encoder没讲,这里面会进行协议的解析/封装,也会进行序列化和反序列化,请看 [Seata的RPC通信源码分析02:协议篇](seata-rpc-multi-protocol02.md) + + diff --git a/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol02.md b/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol02.md new file mode 100644 index 0000000000..1ca63e29bb --- /dev/null +++ b/i18n/zh-cn/docusaurus-plugin-content-blog/seata-rpc-multi-protocol02.md @@ -0,0 +1,300 @@ +--- +title: Seata的RPC通信源码分析02:协议篇(多版本协议) +author: 谢明华 +keywords: [Seata、RPC模块、协议 ] +date: 2024/08/15 +--- +# Seata的RPC通信源码分析02:协议篇(多版本协议) + +### 引言和概述 + +上一篇[Seata的RPC通信源码分析01:传输篇](seata-rpc-multi-protocol01.md)已经介绍了RPC通信的传输机制,这一篇我们继续来看协议部分的内容,把这个图里没解析清楚的encode/decode部分给补充完整。 + +<img src="/img/blog/rpc_multi-protocol/00-netty-layer.png" width="700px" /> + +同样的,我们以提问来深入的方式去探究它。在本文中,我们不仅要了解二进制如何解析成rpcMsg类型,还要知道如何兼容不同版本的协议,那么第一个问题:协议长什么样? + +## 协议结构 +<img src="/img/blog/rpc_multi-protocol/04-protocol.jpg" width="900px" /> + +上图展示了协议在0.7.1之前和之后的变化,(在ProtocolDecoderV1的注释也可以看到,更旧版本的要看ProtocolV1Decoder),可以看到新版本的有以下这些构成部分 +- magic-code:0xdada +- protocal-verson:版本号 +- full-length:总长度 +- head-length:头部长度 +- msgtype:消息类型 +- serializer/codecType:序列化方式 +- compress:压缩方式 +- requestid:请求id + +这里我们说明一下seata各版本的server之间对协议的处理差异 +- version<0.7.1 : 只能处理v0版本的协议(上图中的上半部分,带有flag段的),无法识别其他版本协议 +- 0.7.1<=version<2.2.0 : 只能处理v1版本的协议(上图中的下半部分),无法识别其他版本协议 +- version>=2.2.0 : 可以同时识别v0和v1版本的协议,并处理 + +那么2.2.0是怎样做到兼容的呢?先卖个关子,在说明这个之前我们先看看v1的encoder和decoder分别都是怎样运作的。需要注意的是,和之前提到的传输机制一样,协议处理也是client和server共用的,所以下面提到的都是通用逻辑。 + +## 从ByteBuf到RpcMessage(Encoder/Decoder做了什么) +先来看`ProtocolDecoderV1` +```java + public RpcMessage decodeFrame(ByteBuf frame) { + byte b0 = frame.readByte(); + byte b1 = frame.readByte(); + + // 获取version + byte version = frame.readByte(); + // 获取header和body以外的字段 + int fullLength = frame.readInt(); + short headLength = frame.readShort(); + byte messageType = frame.readByte(); + byte codecType = frame.readByte(); + byte compressorType = frame.readByte(); + int requestId = frame.readInt(); + + ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1(); + rpcMessage.setCodec(codecType); + rpcMessage.setId(requestId); + rpcMessage.setCompressor(compressorType); + rpcMessage.setMessageType(messageType); + + // 头部信息 + int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH; + if (headMapLength > 0) { + Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength); + rpcMessage.getHeadMap().putAll(map); + } + + // 如果是心跳信息不需要对body进行序列化 + if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) { + rpcMessage.setBody(HeartbeatMessage.PING); + } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) { + rpcMessage.setBody(HeartbeatMessage.PONG); + } else { + int bodyLength = fullLength - headLength; + if (bodyLength > 0) { + byte[] bs = new byte[bodyLength]; + frame.readBytes(bs); + // 根据刚才得到的compressorType来按需做解压处理 + Compressor compressor = CompressorFactory.getCompressor(compressorType); + bs = compressor.decompress(bs); + SerializerType protocolType = SerializerType.getByCode(rpcMessage.getCodec()); + if (this.supportDeSerializerTypes.contains(protocolType)) { + // 序列化器,由于这个是v1专用的ProtocolDecoderV1,所以可以直接传入version1 + Serializer serializer = SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1); + rpcMessage.setBody(serializer.deserialize(bs)); + } else { + throw new IllegalArgumentException("SerializerType not match"); + } + } + } + return rpcMessage.protocolMsg2RpcMsg(); + } +``` +由于encode操作正好和decode操作相反,这里不再重复介绍,我们继续看里面的serialize操作。上面的serialize类来自`SerializerServiceLoader` +```java + public static Serializer load(SerializerType type, byte version) throws EnhancedServiceNotFoundException { + // 先处理PROTOBUF方式的序列化,通过反射工具取得 + if (type == SerializerType.PROTOBUF) { + try { + ReflectionUtil.getClassByName(PROTOBUF_SERIALIZER_CLASS_NAME); + } catch (ClassNotFoundException e) { + throw new EnhancedServiceNotFoundException("'ProtobufSerializer' not found. " + + "Please manually reference 'org.apache.seata:seata-serializer-protobuf' dependency ", e); + } + } + + String key = serialzerKey(type, version); + // 这里是一个SERIALIZER_MAP,相当于序列化类的缓存。为什么需要缓存,因为SeataSerializer的scope = Scope.PROTOTYPE,防止多次创建类 + Serializer serializer = SERIALIZER_MAP.get(key); + if (serializer == null) { + if (type == SerializerType.SEATA) { + // 这里是seata的SPI机制,本文不再往里深入加载类的逻辑,只需要知道去加载Serializer这个接口,并且把version给到了构造方法 + serializer = EnhancedServiceLoader.load(Serializer.class, type.name(), new Object[]{version}); + } else { + serializer = EnhancedServiceLoader.load(Serializer.class, type.name()); + } + SERIALIZER_MAP.put(key, serializer); + } + return serializer; + } + + // 这里是SeataSerializer构造方法,里面是单例模式的构造器,因为现在是两个版本各一个类,也可以说是双例 + public SeataSerializer(Byte version) { + if (version == ProtocolConstants.VERSION_0) { + versionSeataSerializer = SeataSerializerV0.getInstance(); + } else if (version == ProtocolConstants.VERSION_1) { + versionSeataSerializer = SeataSerializerV1.getInstance(); + } + if (versionSeataSerializer == null) { + throw new UnsupportedOperationException("version is not supported"); + } + } +``` +这样,decoder就得到了一个Serializer,程序运行到`rpcMessage.setBody(serializer.deserialize(bs))`,我们来看看deserialize是怎样处理的 +```java + public <T> T deserialize(byte[] bytes) { + return deserializeByVersion(bytes, ProtocolConstants.VERSION_0); + } + private static <T> T deserializeByVersion(byte[] bytes, byte version) { + //前面是合法性判断,此处忽略 + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + short typecode = byteBuffer.getShort(); + ByteBuffer in = byteBuffer.slice(); + //创建父类,并根据版本号创建Codec + AbstractMessage abstractMessage = MessageCodecFactory.getMessage(typecode); + MessageSeataCodec messageCodec = MessageCodecFactory.getMessageCodec(typecode, version); + //codec的decode操作 + messageCodec.decode(abstractMessage, in); + return (T) abstractMessage; + } +``` +很遗憾,这个serialize并没有太多逻辑,关键还是在MessageCodecFactory和Codec,我们继续往里看。可以看到`MessageCodecFactory`内容不少,但形式单一,都是根据MessageType返回message和codec,所以这里不再展示factory的内容,我们直接看message和codec,也就是`messageCodec.decode(abstractMessage, in)`,虽然codec类型还是很多,但我们可以看到他们的结构都是相似的,逐个字段解析: +```java + // BranchRegisterRequestCodec的decode,这个请求是注册一个事务分支 + public <T> void decode(T t, ByteBuffer in) { + BranchRegisterRequest branchRegisterRequest = (BranchRegisterRequest)t; + + // 解析xid + short xidLen = in.getShort(); + if (xidLen > 0) { + byte[] bs = new byte[xidLen]; + in.get(bs); + branchRegisterRequest.setXid(new String(bs, UTF8)); + } + // 解析branchType + branchRegisterRequest.setBranchType(BranchType.get(in.get())); + short len = in.getShort(); + if (len > 0) { + byte[] bs = new byte[len]; + in.get(bs); + branchRegisterRequest.setResourceId(new String(bs, UTF8)); + } + // 解析lockKey + int iLen = in.getInt(); + if (iLen > 0) { + byte[] bs = new byte[iLen]; + in.get(bs); + branchRegisterRequest.setLockKey(new String(bs, UTF8)); + } + // 解析applicationData + int applicationDataLen = in.getInt(); + if (applicationDataLen > 0) { + byte[] bs = new byte[applicationDataLen]; + in.get(bs); + branchRegisterRequest.setApplicationData(new String(bs, UTF8)); + } + } +``` +好了,到这里,我们已经得到了branchRegisterRequest,可以愉快地交给TCInboundHandler处理了。 + +但是问题又来了,我们只看到client(RM/TM)有以下这种添加encoder/decoder的代码,也就是我们知道client都使用当前版本的encoder/decoder处理: +```java + bootstrap.handler( + new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds())) + .addLast(new ProtocolDecoderV1()) + .addLast(new ProtocolEncoderV1()); + if (channelHandlers != null) { + addChannelPipelineLast(ch, channelHandlers); + } + } + }); +``` +但server如何处理?还有说好的多版本协议呢? + +## 多版本协议(版本识别和绑定) +我们先来看encoder/decoder的一个类图: + +<img src="/img/blog/rpc_multi-protocol/05-encode-decode-class.jpg" width="800px" /> + +ProtocolDecoderV1我们已经分析完了,ProtocolEncoderV1是反向操作,应该比较好理解,至于ProtocolDecoderV0和ProtocolEncoderV0,从图上也可以看到他们和v1是平行关系,除了v0的操作(虽然目前为止我们还没让他派上用场),他们都是netty里典型的encode和decode的子类,但MultiProtocolDecoder又是什么?他是多版本协议的主角,而且在启动的时候已经注册进server的bootstrap。 +```java + protected boolean isV0(ByteBuf in) { + boolean isV0 = false; + in.markReaderIndex(); + byte b0 = in.readByte(); + byte b1 = in.readByte(); + // 实际上,识别协议就靠第3个byte(b2),只要是正常的新版本,b2就是大于0的版本号,而对于0.7以下的版本来说,b2是FLAG的第一位,正好无论是哪种情况他都是0 + // v1/v2/v3 : b2 = version + // v0 : b2 = 0 ,1st byte in FLAG(2byte:0x10/0x20/0x40/0x80) + byte b2 = in.readByte(); + if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0 && ProtocolConstants.MAGIC_CODE_BYTES[1] == b1 && 0 == b2) { + isV0 = true; + } + // 读完的字节还要吐回去,为了让各版本的decoder能从头解析 + in.resetReaderIndex(); + return isV0; + } + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { + ByteBuf frame; + Object decoded; + byte version; + try { + // 识别版本号,获取当前版本号 + if (isV0(in)) { + decoded = in; + version = ProtocolConstants.VERSION_0; + } else { + decoded = super.decode(ctx, in); + version = decideVersion(decoded); + } + + if (decoded instanceof ByteBuf) { + frame = (ByteBuf) decoded; + ProtocolDecoder decoder = protocolDecoderMap.get(version); + ProtocolEncoder encoder = protocolEncoderMap.get(version); + try { + if (decoder == null || encoder == null) { + throw new UnsupportedOperationException("Unsupported version: " + version); + } + // 首次进来,使用判断好的decoder进行操作 + return decoder.decodeFrame(frame); + } finally { + if (version != ProtocolConstants.VERSION_0) { + frame.release(); + } + // 首次进来,绑定对应version的encoder和decoder,也就相当于绑定了channel + ctx.pipeline().addLast((ChannelHandler)decoder); + ctx.pipeline().addLast((ChannelHandler)encoder); + if (channelHandlers != null) { + ctx.pipeline().addLast(channelHandlers); + } + // 绑定好之后,将自身移除,后续不再判断 + ctx.pipeline().remove(this); + } + } + } catch (Exception exx) { + LOGGER.error("Decode frame error, cause: {}", exx.getMessage()); + throw new DecodeException(exx); + } + return decoded; + } + + protected byte decideVersion(Object in) { + if (in instanceof ByteBuf) { + ByteBuf frame = (ByteBuf) in; + frame.markReaderIndex(); + byte b0 = frame.readByte(); + byte b1 = frame.readByte(); + // 和isV0方法相似,取第3个byte + if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0 || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) { + throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1); + } + + byte version = frame.readByte(); + frame.resetReaderIndex(); + return version; + } + return -1; + } +``` +通过上面的分析,v0终于派上用场(当有旧版本的client注册时,server就会为其分配低版本的encoder/decoder),我们也摸清了多版本协议如何识别、如何绑定。 + + + + + + diff --git a/static/img/blog/rpc_multi-protocol/00-netty-layer.png b/static/img/blog/rpc_multi-protocol/00-netty-layer.png new file mode 100644 index 0000000000..01121a0d59 Binary files /dev/null and b/static/img/blog/rpc_multi-protocol/00-netty-layer.png differ diff --git a/static/img/blog/rpc_multi-protocol/01-netty-class.jpg b/static/img/blog/rpc_multi-protocol/01-netty-class.jpg new file mode 100644 index 0000000000..fb56f23e29 Binary files /dev/null and b/static/img/blog/rpc_multi-protocol/01-netty-class.jpg differ diff --git a/static/img/blog/rpc_multi-protocol/02-processor.jpg b/static/img/blog/rpc_multi-protocol/02-processor.jpg new file mode 100644 index 0000000000..ee16f6849c Binary files /dev/null and b/static/img/blog/rpc_multi-protocol/02-processor.jpg differ diff --git a/static/img/blog/rpc_multi-protocol/03-netty-handler.jpg b/static/img/blog/rpc_multi-protocol/03-netty-handler.jpg new file mode 100644 index 0000000000..178921adfa Binary files /dev/null and b/static/img/blog/rpc_multi-protocol/03-netty-handler.jpg differ diff --git a/static/img/blog/rpc_multi-protocol/04-protocol.jpg b/static/img/blog/rpc_multi-protocol/04-protocol.jpg new file mode 100644 index 0000000000..1d241772f7 Binary files /dev/null and b/static/img/blog/rpc_multi-protocol/04-protocol.jpg differ diff --git a/static/img/blog/rpc_multi-protocol/05-encode-decode-class.jpg b/static/img/blog/rpc_multi-protocol/05-encode-decode-class.jpg new file mode 100644 index 0000000000..3e95375fd3 Binary files /dev/null and b/static/img/blog/rpc_multi-protocol/05-encode-decode-class.jpg differ diff --git a/static/img/blog/rpc_multi-protocol/06-multi-protocol.png b/static/img/blog/rpc_multi-protocol/06-multi-protocol.png new file mode 100644 index 0000000000..fd530052dd Binary files /dev/null and b/static/img/blog/rpc_multi-protocol/06-multi-protocol.png differ --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@seata.apache.org For additional commands, e-mail: notifications-h...@seata.apache.org