This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new 08eb0e65f6 feature: multi-version seata protocol support (#6226)
08eb0e65f6 is described below

commit 08eb0e65f6769048bef8533b66691a682854621f
Author: justabug <[email protected]>
AuthorDate: Thu Jun 27 14:23:43 2024 +0800

    feature: multi-version seata protocol support (#6226)
---
 .../org/apache/seata/core/protocol/RpcMessage.java |  10 ++
 .../org/apache/seata/core/protocol/Version.java    |  10 ++
 .../rpc/netty/AbstractNettyRemotingServer.java     |  40 ++++--
 .../core/rpc/netty/CompatibleProtocolDecoder.java  | 155 +++++++++++++++++++++
 .../core/rpc/netty/CompatibleProtocolEncoder.java  |  79 +++++++++++
 .../seata/core/rpc/netty/NettyClientBootstrap.java |   6 +-
 .../seata/core/rpc/netty/NettyServerBootstrap.java |   6 +-
 .../seata/core/rpc/netty/ProtocolDecoder.java      |  29 ++++
 .../seata/core/rpc/netty/ProtocolEncoder.java      |  28 ++++
 .../seata/core/rpc/netty/v0/MessageCodecV0.java    |  44 ++++++
 .../core/rpc/netty/v0/ProtocolConstantsV0.java     |  31 +++++
 .../seata/core/rpc/netty/v0/ProtocolDecoderV0.java | 132 ++++++++++++++++++
 .../seata/core/rpc/netty/v0/ProtocolEncoderV0.java | 106 ++++++++++++++
 ...otocolV1Decoder.java => ProtocolDecoderV1.java} |  60 ++------
 ...otocolV1Encoder.java => ProtocolEncoderV1.java} | 103 +++++++-------
 .../core/rpc/processor/server/RegTmProcessor.java  |   4 +-
 .../core/rpc/netty/mockserver/RmClientTest.java    |   6 -
 .../core/rpc/netty/mockserver/TmClientTest.java    |   6 -
 .../seata/core/rpc/netty/v1/ProtocolV1Client.java  |   7 +-
 .../seata/core/rpc/netty/v1/ProtocolV1Server.java  |  10 +-
 20 files changed, 734 insertions(+), 138 deletions(-)

diff --git a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java 
b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java
index 4f0963b20f..6a7e4d9da5 100644
--- a/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java
+++ b/core/src/main/java/org/apache/seata/core/protocol/RpcMessage.java
@@ -35,6 +35,8 @@ public class RpcMessage implements Serializable {
     private Map<String, String> headMap = new HashMap<>();
     private Object body;
 
+    private String otherSideVersion;
+
     /**
      * Gets id.
      *
@@ -169,6 +171,14 @@ public class RpcMessage implements Serializable {
         this.messageType = messageType;
     }
 
+    public String getOtherSideVersion() {
+        return otherSideVersion;
+    }
+
+    public void setOtherSideVersion(String otherSideVersion) {
+        this.otherSideVersion = otherSideVersion;
+    }
+
     @Override
     public String toString() {
         return StringUtils.toString(this);
diff --git a/core/src/main/java/org/apache/seata/core/protocol/Version.java 
b/core/src/main/java/org/apache/seata/core/protocol/Version.java
index a9be3d9c09..d1a95924cf 100644
--- a/core/src/main/java/org/apache/seata/core/protocol/Version.java
+++ b/core/src/main/java/org/apache/seata/core/protocol/Version.java
@@ -150,6 +150,16 @@ public class Version {
         return -1;
     }
 
+    public static byte calcProtocolVersion(String sdkVersion) throws 
IncompatibleVersionException {
+        long version = convertVersion(sdkVersion);
+        long v0 = convertVersion(VERSION_0_7_1);
+        if (version <= v0) {
+            return ProtocolConstants.VERSION_0;
+        } else {
+            return ProtocolConstants.VERSION_1;
+        }
+    }
+
     private static long calculatePartValue(String partNumeric, int size, int 
index) {
         return Long.parseLong(partNumeric) * Double.valueOf(Math.pow(100, size 
- index)).longValue();
     }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
index 72324c2689..9be9e79c3b 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
@@ -69,7 +69,7 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
         if (channel == null) {
             throw new RuntimeException("rm client is not connected. dbkey:" + 
resourceId + ",clientId:" + clientId);
         }
-        RpcMessage rpcMessage = buildRequestMessage(msg, 
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
+        RpcMessage rpcMessage = buildRequestMessage(channel, msg, 
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
         return super.sendSync(channel, rpcMessage, 
NettyServerConfig.getRpcRequestTimeout());
     }
 
@@ -78,7 +78,7 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
         if (channel == null) {
             throw new RuntimeException("client is not connected");
         }
-        RpcMessage rpcMessage = buildRequestMessage(msg, 
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
+        RpcMessage rpcMessage = buildRequestMessage(channel, msg, 
ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
         return super.sendSync(channel, rpcMessage, 
NettyServerConfig.getRpcRequestTimeout());
     }
 
@@ -87,26 +87,42 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
         if (channel == null) {
             throw new RuntimeException("client is not connected");
         }
-        RpcMessage rpcMessage = buildRequestMessage(msg, 
ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
+        RpcMessage rpcMessage = buildRequestMessage(channel, msg, 
ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
         super.sendAsync(channel, rpcMessage);
     }
 
     @Override
     public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, 
Object msg) {
-        final Channel clientChannel = msg instanceof HeartbeatMessage
-                ? channel
-                : ChannelManager.getSameClientChannel(channel);
-
-        if (clientChannel == null) {
-            throw new RuntimeException("Not found client channel to response | 
channel: " + channel);
+        Channel clientChannel = channel;
+        if (!(msg instanceof HeartbeatMessage)) {
+            clientChannel = ChannelManager.getSameClientChannel(channel);
         }
-
-        RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg 
instanceof HeartbeatMessage
+        if (clientChannel != null) {
+            RpcMessage rpcMsg = buildResponseMessage(channel, rpcMessage, msg, 
msg instanceof HeartbeatMessage
                 ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE
                 : ProtocolConstants.MSGTYPE_RESPONSE);
-        super.sendAsync(clientChannel, rpcMsg);
+            super.sendAsync(clientChannel, rpcMsg);
+        } else {
+            throw new RuntimeException("channel is error.");
+        }
     }
 
+
+    private RpcMessage buildResponseMessage(Channel channel, RpcMessage 
fromRpcMessage, Object msg, byte messageType) {
+        RpcMessage rpcMessage = super.buildResponseMessage(fromRpcMessage, 
msg, messageType);
+        RpcContext rpcContext = 
ChannelManager.getContextFromIdentified(channel);
+        rpcMessage.setOtherSideVersion(rpcContext.getVersion());
+        return rpcMessage;
+    }
+
+    protected RpcMessage buildRequestMessage(Channel channel, Object msg, byte 
messageType) {
+        RpcMessage rpcMessage = super.buildRequestMessage(msg, messageType);
+        RpcContext rpcContext = 
ChannelManager.getContextFromIdentified(channel);
+        rpcMessage.setOtherSideVersion(rpcContext.getVersion());
+        return rpcMessage;
+    }
+
+
     @Override
     public void registerProcessor(int messageType, RemotingProcessor 
processor, ExecutorService executor) {
         Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, 
executor);
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
new file mode 100644
index 0000000000..d066984c23
--- /dev/null
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolDecoder.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.seata.core.exception.DecodeException;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.rpc.netty.v0.ProtocolDecoderV0;
+import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * <pre>
+ * (> 0.7.0)
+ * 0     1     2     3     4     5     6     7     8     9    10     11    12  
  13    14    15    16
+ * 
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * |   magic   |Proto|     Full length       |    Head   | Msg |Seria|Compr|   
  RequestId         |
+ * |   code    |colVer|    (head+body)      |   Length  |Type |lizer|ess  |    
                   |
+ * 
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+ *
+ * (<= 0.7.0)
+ * 0     1     2     3     4           6           8          10           12  
        14
+ * 
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * |   0xdada  |   flag    | typecode/ |                 requestid             
        |
+ * |           |           | bodylength|                                       
        |
+ * 
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+ *
+ * </pre>
+ * <p>
+ * <li>Full Length: include all data </li>
+ * <li>Head Length: include head data from magic code to head map. </li>
+ * <li>Body Length: Full Length - Head Length</li>
+ * </p>
+ */
+public class CompatibleProtocolDecoder extends LengthFieldBasedFrameDecoder {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CompatibleProtocolDecoder.class);
+    private static Map<Byte, ProtocolDecoder> protocolDecoderMap;
+
+    public CompatibleProtocolDecoder() {
+        // default is 8M
+        this(ProtocolConstants.MAX_FRAME_LENGTH);
+    }
+
+    public CompatibleProtocolDecoder(int maxFrameLength) {
+        /*
+        int maxFrameLength,      
+        int lengthFieldOffset,  magic code is 2B, and version is 1B, and then 
FullLength. so value is 3
+        int lengthFieldLength,  FullLength is int(4B). so values is 4
+        int lengthAdjustment,   FullLength include all data and read 7 bytes 
before, so the left length is (FullLength-7). so values is -7
+        int initialBytesToStrip we will check magic code and version self, so 
do not strip any bytes. so values is 0
+        */
+        super(maxFrameLength, 3, 4, -7, 0);
+        protocolDecoderMap = ImmutableMap.<Byte, ProtocolDecoder>builder()
+                .put(ProtocolConstants.VERSION_0, new ProtocolDecoderV0())
+                .put(ProtocolConstants.VERSION_1, new ProtocolDecoderV1())
+                .build();
+    }
+
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws 
Exception {
+        ByteBuf frame;
+        Object decoded;
+        byte version;
+        try {
+            if (isV0(in)) {
+                decoded = in;
+                version = ProtocolConstants.VERSION_0;
+            } else {
+                decoded = super.decode(ctx, in);
+                version = decideVersion(decoded);
+            }
+
+            if (decoded instanceof ByteBuf) {
+                frame = (ByteBuf) decoded;
+                try {
+                    ProtocolDecoder decoder = protocolDecoderMap.get(version);
+                    if (decoder == null) {
+                        throw new UnsupportedOperationException("Unsupported 
version: " + version);
+                    }
+                    return decoder.decodeFrame(frame);
+                } finally {
+                    if (version != ProtocolConstants.VERSION_0) {
+                        frame.release();
+                    }
+                }
+            }
+        } catch (Exception exx) {
+            LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
+            throw new DecodeException(exx);
+        }
+        return decoded;
+    }
+
+    protected byte decideVersion(Object in) {
+        if (in instanceof ByteBuf) {
+            ByteBuf frame = (ByteBuf) in;
+            frame.markReaderIndex();
+            byte b0 = frame.readByte();
+            byte b1 = frame.readByte();
+            if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
+                    || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
+                throw new IllegalArgumentException("Unknown magic code: " + b0 
+ ", " + b1);
+            }
+
+            byte version = frame.readByte();
+            frame.resetReaderIndex();
+            return version;
+        }
+        return -1;
+    }
+
+
+    protected boolean isV0(ByteBuf in) {
+        boolean isV0 = false;
+        in.markReaderIndex();
+        byte b0 = in.readByte();
+        byte b1 = in.readByte();
+        // v1/v2/v3 : b2 = version
+        // v0 : 1st byte in FLAG(2byte:0x10/0x20/0x40/0x80)
+        byte b2 = in.readByte();
+        if (ProtocolConstants.MAGIC_CODE_BYTES[0] == b0
+                && ProtocolConstants.MAGIC_CODE_BYTES[1] == b1
+                && 0 == b2) {
+            isV0 = true;
+        }
+
+        in.resetReaderIndex();
+        return isV0;
+    }
+
+    protected boolean isV0(byte version) {
+        return version == ProtocolConstants.VERSION_0;
+    }
+}
\ No newline at end of file
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
new file mode 100644
index 0000000000..e588b92b8e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/CompatibleProtocolEncoder.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.protocol.Version;
+import org.apache.seata.core.rpc.netty.v0.ProtocolEncoderV0;
+import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Compatible Protocol Encoder
+ * <p>
+ * <li>Full Length: include all data </li>
+ * <li>Head Length: include head data from magic code to head map. </li>
+ * <li>Body Length: Full Length - Head Length</li>
+ * </p>
+ */
+public class CompatibleProtocolEncoder extends MessageToByteEncoder {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CompatibleProtocolEncoder.class);
+
+    private static Map<Byte, ProtocolEncoder> protocolEncoderMap;
+
+    public CompatibleProtocolEncoder() {
+        super();
+        protocolEncoderMap = ImmutableMap.<Byte, ProtocolEncoder>builder()
+                .put(ProtocolConstants.VERSION_0, new ProtocolEncoderV0())
+                .put(ProtocolConstants.VERSION_1, new ProtocolEncoderV1())
+                .build();
+    }
+
+    @Override
+    public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
+        try {
+            if (msg instanceof RpcMessage) {
+                RpcMessage rpcMessage = (RpcMessage) msg;
+                String sdkVersion = rpcMessage.getOtherSideVersion();
+                if (StringUtils.isBlank(sdkVersion)) {
+                    sdkVersion = Version.getCurrent();
+                }
+                byte protocolVersion = Version.calcProtocolVersion(sdkVersion);
+                ProtocolEncoder encoder = 
protocolEncoderMap.get(protocolVersion);
+                if (encoder == null) {
+                    throw new UnsupportedOperationException("Unsupported 
protocolVersion: " + protocolVersion);
+                }
+
+                encoder.encode(rpcMessage, out);
+            } else {
+                throw new UnsupportedOperationException("Not support this 
class:" + msg.getClass());
+            }
+        } catch (Throwable e) {
+            LOGGER.error("Encode request error!", e);
+        }
+    }
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
index 63b15a7466..4867f86bcf 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java
@@ -35,8 +35,6 @@ import io.netty.util.internal.PlatformDependent;
 import org.apache.seata.common.exception.FrameworkException;
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.core.rpc.RemotingBootstrap;
-import org.apache.seata.core.rpc.netty.v1.ProtocolV1Decoder;
-import org.apache.seata.core.rpc.netty.v1.ProtocolV1Encoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,8 +132,8 @@ public class NettyClientBootstrap implements 
RemotingBootstrap {
                         new 
IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
                             nettyClientConfig.getChannelMaxWriteIdleSeconds(),
                             nettyClientConfig.getChannelMaxAllIdleSeconds()))
-                        .addLast(new ProtocolV1Decoder())
-                        .addLast(new ProtocolV1Encoder());
+                        .addLast(new CompatibleProtocolDecoder())
+                        .addLast(new CompatibleProtocolEncoder());
                     if (channelHandlers != null) {
                         addChannelPipelineLast(ch, channelHandlers);
                     }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
index 6c9a325882..b847b2a96d 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
@@ -37,8 +37,6 @@ import org.apache.seata.common.XID;
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.config.ConfigurationFactory;
 import org.apache.seata.core.rpc.RemotingBootstrap;
-import org.apache.seata.core.rpc.netty.v1.ProtocolV1Decoder;
-import org.apache.seata.core.rpc.netty.v1.ProtocolV1Encoder;
 import org.apache.seata.discovery.registry.MultiRegistryFactory;
 import org.apache.seata.discovery.registry.RegistryService;
 import org.slf4j.Logger;
@@ -161,8 +159,8 @@ public class NettyServerBootstrap implements 
RemotingBootstrap {
                 @Override
                 public void initChannel(SocketChannel ch) {
                     ch.pipeline().addLast(new 
IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
-                        .addLast(new ProtocolV1Decoder())
-                        .addLast(new ProtocolV1Encoder());
+                        .addLast(new CompatibleProtocolDecoder())
+                        .addLast(new CompatibleProtocolEncoder());
                     if (channelHandlers != null) {
                         addChannelPipelineLast(ch, channelHandlers);
                     }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java
new file mode 100644
index 0000000000..42a7c75c04
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDecoder.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * the protocol decoder
+ *
+ **/
+public interface ProtocolDecoder {
+
+    ProtocolRpcMessage decodeFrame(ByteBuf in);
+
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java
new file mode 100644
index 0000000000..6c91164fff
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolEncoder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.RpcMessage;
+
+/**
+ * the protocol encoder
+ *
+ **/
+public interface ProtocolEncoder {
+    void encode(RpcMessage rpcMessage, ByteBuf out);
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java
new file mode 100644
index 0000000000..ab1d4f7471
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/MessageCodecV0.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.MessageTypeAware;
+
+/**
+ * The interface Message codec.
+ *
+ */
+public interface MessageCodecV0<T> extends MessageTypeAware {
+
+    /**
+     * Encode byte [ ].
+     *
+     * @return the byte [ ]
+     */
+    byte[] encode();
+
+    /**
+     * Decode boolean.
+     *
+     * @param in the in
+     * @return the boolean
+     */
+    boolean decode(ByteBuf in);
+
+    boolean decode(ByteBuf in, T req);
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolConstantsV0.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolConstantsV0.java
new file mode 100644
index 0000000000..65b3634059
--- /dev/null
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolConstantsV0.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+/**
+ * protocol v0 constants
+ *
+ **/
+public class ProtocolConstantsV0 {
+    public static short MAGIC = (short)0xdada;
+
+    public static int HEAD_LENGTH = 14;
+    public static final short FLAG_REQUEST = 0x80;
+    public static final short FLAG_ASYNC = 0x40;
+    public static final short FLAG_HEARTBEAT = 0x20;
+    public static final short FLAG_SEATA_CODEC = 0x10;
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java
new file mode 100644
index 0000000000..42e112a2f6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolDecoderV0.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.HeartbeatMessage;
+
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.rpc.netty.ProtocolDecoder;
+import org.apache.seata.core.serializer.Serializer;
+import org.apache.seata.core.serializer.SerializerServiceLoader;
+import org.apache.seata.core.serializer.SerializerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <pre>
+ *  seata-version < 0.7
+ *  Only used in TC receives a request from RM/TM.
+ * 0     1     2     3     4           6           8          10           12  
        14         16
+ * 
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * |   0xdada  |   flag    | typecode/ |                 requestid             
        |           |
+ * |           |           | bodylength|                                       
        |           |
+ * 
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
           +
+ * |                                    ... ...                                
                    |
+ * +                                                                           
                    +
+ * |                                     body                                  
                    |
+ * +                                                                           
                    +
+ * |                                    ... ...                                
                    |
+ * 
+-----------------------------------------------------------------------------------------------+
+ *
+ * </pre>
+ * <p>
+ * <li>flag: msg type </li>
+ * <li>typecode: action type code </li>
+ * <li>bodylength: body Length </li>
+ * <li>requestid: request id</li>
+ * </p>
+ *
+ * @see ProtocolEncoderV0
+ */
+public class ProtocolDecoderV0 implements ProtocolDecoder {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolDecoderV0.class);
+
+
+    @Override
+    public ProtocolRpcMessageV0 decodeFrame(ByteBuf in) {
+        ProtocolRpcMessageV0 rpcMessage = new ProtocolRpcMessageV0();
+        if (in.readableBytes() < ProtocolConstantsV0.HEAD_LENGTH) {
+            throw new IllegalArgumentException("Nothing to decode.");
+        }
+
+        in.markReaderIndex();
+        short protocol = in.readShort();
+        int flag = (int) in.readShort();
+
+        boolean isHeartbeat = (ProtocolConstantsV0.FLAG_HEARTBEAT & flag) > 0;
+        boolean isRequest = (ProtocolConstantsV0.FLAG_REQUEST & flag) > 0;
+        boolean isSeataCodec = (ProtocolConstantsV0.FLAG_SEATA_CODEC & flag) > 
0;
+        rpcMessage.setSeataCodec(isSeataCodec);
+
+        short bodyLength = 0;
+        short typeCode = 0;
+        if (!isSeataCodec) {
+            bodyLength = in.readShort();
+        } else {
+            typeCode = in.readShort();
+        }
+        long msgId = in.readLong();
+        rpcMessage.setId(msgId);
+        if (isHeartbeat) {
+            rpcMessage.setAsync(true);
+            rpcMessage.setHeartbeat(isHeartbeat);
+            rpcMessage.setRequest(isRequest);
+            if (isRequest) {
+                rpcMessage.setBody(HeartbeatMessage.PING);
+            } else {
+                rpcMessage.setBody(HeartbeatMessage.PONG);
+            }
+
+            return rpcMessage;
+        }
+
+        if (bodyLength > 0 && in.readableBytes() < bodyLength) {
+            in.resetReaderIndex();
+            throw new IllegalArgumentException("readableBytes < bodyLength");
+        }
+
+        rpcMessage.setAsync((ProtocolConstantsV0.FLAG_ASYNC & flag) > 0);
+        rpcMessage.setHeartbeat(false);
+        rpcMessage.setRequest(isRequest);
+
+        try {
+            int length = in.readableBytes();
+            byte[] bs = new byte[length];
+            in.readBytes(bs);
+
+            // fill messageType in v0
+            byte[] bs2 = new byte[2 + length];
+            bs2[0] = (byte) (0x00FF & (typeCode >> 8));
+            bs2[1] = (byte) (0x00FF & typeCode);
+            System.arraycopy(bs, 0, bs2, 2, length);
+            byte codecType = isSeataCodec ? SerializerType.SEATA.getCode() : 
SerializerType.HESSIAN.getCode();
+            Serializer serializer = 
SerializerServiceLoader.load(SerializerType.getByCode(codecType), 
ProtocolConstants.VERSION_0);
+            rpcMessage.setBody(serializer.deserialize(bs2));
+        } catch (Exception e) {
+            LOGGER.error("decode error", e);
+            throw e;
+        }
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Receive:" + rpcMessage.getBody() + ", messageId:" + 
msgId);
+        }
+        return rpcMessage;
+    }
+
+
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java
new file mode 100644
index 0000000000..3fc447b281
--- /dev/null
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolEncoderV0.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.seata.core.protocol.HeartbeatMessage;
+import org.apache.seata.core.protocol.MessageTypeAware;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolEncoder;
+import org.apache.seata.core.serializer.Serializer;
+import org.apache.seata.core.serializer.SerializerServiceLoader;
+import org.apache.seata.core.serializer.SerializerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <pre>
+ *  seata-version < 0.7
+ *  Only used in TC send a request to RM/TM.
+ * 0     1     2     3     4           6           8          10           12  
        14         16
+ * 
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+ * |   0xdada  |   flag    | typecode/ |                 requestid             
        |           |
+ * |           |           | bodylength|                                       
        |           |
+ * 
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
           +
+ * |                                    ... ...                                
                    |
+ * +                                                                           
                    +
+ * |                                     body                                  
                    |
+ * +                                                                           
                    +
+ * |                                    ... ...                                
                    |
+ * 
+-----------------------------------------------------------------------------------------------+
+ *
+ * </pre>
+ * <p>
+ * <li>flag: msg type </li>
+ * <li>typecode: action type code </li>
+ * <li>bodylength: body Length </li>
+ * <li>requestid: request id</li>
+ * </p>
+ *
+ * @see ProtocolDecoderV0
+ */
+public class ProtocolEncoderV0 implements ProtocolEncoder {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolEncoderV0.class);
+
+    @Override
+    public void encode(RpcMessage message, ByteBuf out) {
+        try {
+            byte codec = message.getCodec();
+            ProtocolRpcMessageV0 msg = new ProtocolRpcMessageV0();
+            msg.rpcMsg2ProtocolMsg(message);
+
+            out.writeShort(ProtocolConstantsV0.MAGIC);
+            int flag = (msg.isAsync() ? ProtocolConstantsV0.FLAG_ASYNC : 0)
+                    | (msg.isHeartbeat() ? ProtocolConstantsV0.FLAG_HEARTBEAT 
: 0)
+                    | (msg.isRequest() ? ProtocolConstantsV0.FLAG_REQUEST : 0)
+                    | (msg.isSeataCodec() ? 
ProtocolConstantsV0.FLAG_SEATA_CODEC : 0);
+
+            out.writeShort((short) flag);
+
+            if (msg.getBody() instanceof HeartbeatMessage) {
+                out.writeShort((short) 0);
+                out.writeLong(msg.getId());
+                return;
+            }
+
+            byte[] bodyBytes = null;
+            Serializer serializer = 
SerializerServiceLoader.load(SerializerType.getByCode(codec), 
ProtocolConstants.VERSION_0);
+            bodyBytes = serializer.serialize(msg.getBody());
+
+            if (msg.isSeataCodec()) {
+                if (msg.getBody() instanceof MessageTypeAware) {
+                    short typeCode = ((MessageTypeAware) 
msg.getBody()).getTypeCode();
+                    out.writeShort(typeCode);
+                }
+            } else {
+                out.writeShort(bodyBytes.length);
+            }
+            out.writeLong(msg.getId());
+            if (bodyBytes != null) {
+                out.writeBytes(bodyBytes);
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Send:" + msg.getBody());
+            }
+        } catch (Throwable e) {
+            LOGGER.error("Encode request error!", e);
+        }
+    }
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
similarity index 76%
rename from 
core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
rename to 
core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
index 26ef52ffdc..9ca4977944 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolDecoderV1.java
@@ -20,19 +20,19 @@ import java.util.List;
 import java.util.Map;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
 import org.apache.seata.core.compressor.Compressor;
 import org.apache.seata.core.compressor.CompressorFactory;
-import org.apache.seata.core.exception.DecodeException;
 import org.apache.seata.core.protocol.HeartbeatMessage;
 import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.rpc.netty.ProtocolDecoder;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
 import org.apache.seata.core.serializer.Serializer;
 import org.apache.seata.core.serializer.SerializerServiceLoader;
 import org.apache.seata.core.serializer.SerializerType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * <pre>
  * 0     1     2     3     4     5     6     7     8     9    10     11    12  
  13    14    15    16
@@ -56,57 +56,24 @@ import org.slf4j.LoggerFactory;
  * </p>
  * https://github.com/seata/seata/issues/893
  *
- * @see ProtocolV1Encoder
+ * @see ProtocolEncoderV1
+ * @author Geng Zhang
+ * @see ProtocolEncoderV1
  * @since 0.7.0
  */
-public class ProtocolV1Decoder extends LengthFieldBasedFrameDecoder {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolV1Decoder.class);
+public class ProtocolDecoderV1 implements ProtocolDecoder {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolDecoderV1.class);
     private final List<SerializerType> supportDeSerializerTypes;
 
-
-    public ProtocolV1Decoder() {
-        // default is 8M
-        this(ProtocolConstants.MAX_FRAME_LENGTH);
-    }
-
-    public ProtocolV1Decoder(int maxFrameLength) {
-        /*
-        int maxFrameLength,      
-        int lengthFieldOffset,  magic code is 2B, and version is 1B, and then 
FullLength. so value is 3
-        int lengthFieldLength,  FullLength is int(4B). so values is 4
-        int lengthAdjustment,   FullLength include all data and read 7 bytes 
before, so the left length is (FullLength-7). so values is -7
-        int initialBytesToStrip we will check magic code and version self, so 
do not strip any bytes. so values is 0
-        */
-        super(maxFrameLength, 3, 4, -7, 0);
+    public ProtocolDecoderV1() {
         supportDeSerializerTypes = 
SerializerServiceLoader.getSupportedSerializers();
         if (supportDeSerializerTypes.isEmpty()) {
             throw new IllegalArgumentException("No serializer found");
-        }
-    }
+        }    }
 
     @Override
-    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws 
Exception {
-        Object decoded;
-        try {
-            decoded = super.decode(ctx, in);
-            if (decoded instanceof ByteBuf) {
-                ByteBuf frame = (ByteBuf)decoded;
-                try {
-                    return decodeFrame(frame);
-                } finally {
-                    frame.release();
-                }
-            }
-        } catch (Exception exx) {
-            LOGGER.error("Decode frame error, cause: {}", exx.getMessage());
-            throw new DecodeException(exx);
-        }
-        return decoded;
-    }
-
-    public Object decodeFrame(ByteBuf frame) {
+    public ProtocolRpcMessage decodeFrame(ByteBuf frame) {
         byte b0 = frame.readByte();
         byte b1 = frame.readByte();
         if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
@@ -115,7 +82,6 @@ public class ProtocolV1Decoder extends 
LengthFieldBasedFrameDecoder {
         }
 
         byte version = frame.readByte();
-        // TODO  check version compatible here
 
         int fullLength = frame.readInt();
         short headLength = frame.readShort();
@@ -154,13 +120,11 @@ public class ProtocolV1Decoder extends 
LengthFieldBasedFrameDecoder {
                     Serializer serializer = 
SerializerServiceLoader.load(protocolType, ProtocolConstants.VERSION_1);
                     rpcMessage.setBody(serializer.deserialize(bs));
                 } else {
-                    throw new IllegalArgumentException("SerializerType not 
match: " + protocolType.name());
+                    throw new IllegalArgumentException("SerializerType not 
match");
                 }
             }
         }
 
         return rpcMessage;
     }
-
-
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
similarity index 51%
rename from 
core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
rename to 
core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
index 575992fd87..14cbcdb55d 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolEncoderV1.java
@@ -17,8 +17,7 @@
 package org.apache.seata.core.rpc.netty.v1;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.seata.core.rpc.netty.ProtocolEncoder;
 import org.apache.seata.core.serializer.Serializer;
 import org.apache.seata.core.compressor.Compressor;
 import org.apache.seata.core.compressor.CompressorFactory;
@@ -54,70 +53,70 @@ import java.util.Map;
  * </p>
  * https://github.com/seata/seata/issues/893
  *
- * @see ProtocolV1Decoder
+ * @author Geng Zhang
+ * @see ProtocolDecoderV1
  * @since 0.7.0
  */
-public class ProtocolV1Encoder extends MessageToByteEncoder {
+public class ProtocolEncoderV1 implements ProtocolEncoder {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolV1Encoder.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProtocolEncoderV1.class);
 
-    @Override
-    public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
-        try {
 
-            if (msg instanceof RpcMessage) {
-                RpcMessage rpcMsg = (RpcMessage) msg;
+    public void encode(RpcMessage message, ByteBuf out) {
+        try {
 
-                ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
-                rpcMessage.rpcMsg2ProtocolMsg(rpcMsg);
-                int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
-                int headLength = ProtocolConstants.V1_HEAD_LENGTH;
+            ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
+            rpcMessage.rpcMsg2ProtocolMsg(message);
 
-                byte messageType = rpcMessage.getMessageType();
-                out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
-                out.writeByte(ProtocolConstants.VERSION);
-                // full Length(4B) and head length(2B) will fix in the end. 
-                out.writerIndex(out.writerIndex() + 6);
-                out.writeByte(messageType);
-                out.writeByte(rpcMessage.getCodec());
-                out.writeByte(rpcMessage.getCompressor());
-                out.writeInt(rpcMessage.getId());
+            int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
+            int headLength = ProtocolConstants.V1_HEAD_LENGTH;
 
-                // direct write head with zero-copy
-                Map<String, String> headMap = rpcMessage.getHeadMap();
-                if (headMap != null && !headMap.isEmpty()) {
-                    int headMapBytesLength = 
HeadMapSerializer.getInstance().encode(headMap, out);
-                    headLength += headMapBytesLength;
-                    fullLength += headMapBytesLength;
-                }
+            byte messageType = rpcMessage.getMessageType();
+            out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
+            out.writeByte(ProtocolConstants.VERSION_1);
+            // full Length(4B) and head length(2B) will fix in the end.
+            out.writerIndex(out.writerIndex() + 6);
+            out.writeByte(messageType);
+            out.writeByte(rpcMessage.getCodec());
+            out.writeByte(rpcMessage.getCompressor());
+            out.writeInt(rpcMessage.getId());
 
-                byte[] bodyBytes = null;
-                if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
-                        && messageType != 
ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
-                    // heartbeat has no body
-                    Serializer serializer = 
SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), 
ProtocolConstants.VERSION_1);
-                    bodyBytes = serializer.serialize(rpcMessage.getBody());
-                    Compressor compressor = 
CompressorFactory.getCompressor(rpcMessage.getCompressor());
-                    bodyBytes = compressor.compress(bodyBytes);
-                    fullLength += bodyBytes.length;
-                }
+            // direct write head with zero-copy
+            Map<String, String> headMap = rpcMessage.getHeadMap();
+            if (headMap != null && !headMap.isEmpty()) {
+                int headMapBytesLength = 
HeadMapSerializer.getInstance().encode(headMap, out);
+                headLength += headMapBytesLength;
+                fullLength += headMapBytesLength;
+            }
 
-                if (bodyBytes != null) {
-                    out.writeBytes(bodyBytes);
-                }
+            byte[] bodyBytes = null;
+            if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
+                    && messageType != 
ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
+                // heartbeat has no body
+                Serializer serializer = 
SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()), 
ProtocolConstants.VERSION_1);
+                bodyBytes = serializer.serialize(rpcMessage.getBody());
+                Compressor compressor = 
CompressorFactory.getCompressor(rpcMessage.getCompressor());
+                bodyBytes = compressor.compress(bodyBytes);
+                fullLength += bodyBytes.length;
+            }
 
-                // fix fullLength and headLength
-                int writeIndex = out.writerIndex();
-                // skip magic code(2B) + version(1B)
-                out.writerIndex(writeIndex - fullLength + 3);
-                out.writeInt(fullLength);
-                out.writeShort(headLength);
-                out.writerIndex(writeIndex);
-            } else {
-                throw new UnsupportedOperationException("Not support this 
class:" + msg.getClass());
+            if (bodyBytes != null) {
+                out.writeBytes(bodyBytes);
             }
+
+            // fix fullLength and headLength
+            int writeIndex = out.writerIndex();
+            // skip magic code(2B) + version(1B)
+            out.writerIndex(writeIndex - fullLength + 3);
+            out.writeInt(fullLength);
+            out.writeShort(headLength);
+            out.writerIndex(writeIndex);
+
+
         } catch (Throwable e) {
             LOGGER.error("Encode request error!", e);
+            // todo
+            throw e;
         }
     }
 }
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java
 
b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java
index 0afee867f8..6090232c6c 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/processor/server/RegTmProcessor.java
@@ -89,8 +89,8 @@ public class RegTmProcessor implements RemotingProcessor {
         }
         remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
         if (isSuccess && LOGGER.isInfoEnabled()) {
-            LOGGER.info("TM register success,message:{},channel:{},client 
version:{}", message, ctx.channel(),
-                message.getVersion());
+            LOGGER.info("TM register success,message:{},channel:{},client 
version:{},client protocol-version:{}"
+                    , message, ctx.channel(), message.getVersion(), 
rpcMessage.getOtherSideVersion());
         }
     }
 
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/RmClientTest.java
 
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/RmClientTest.java
index 671f1d4431..c7f100bce7 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/RmClientTest.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/RmClientTest.java
@@ -17,8 +17,6 @@
 package org.apache.seata.core.rpc.netty.mockserver;
 
 import io.netty.channel.Channel;
-import org.apache.seata.common.ConfigurationKeys;
-import org.apache.seata.common.ConfigurationTestHelper;
 import org.apache.seata.core.context.RootContext;
 import org.apache.seata.core.exception.TransactionException;
 import org.apache.seata.core.model.BranchStatus;
@@ -27,13 +25,9 @@ import org.apache.seata.core.protocol.HeartbeatMessage;
 import org.apache.seata.core.rpc.netty.ChannelManagerTestHelper;
 import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
 import 
org.apache.seata.integration.tx.api.interceptor.parser.DefaultResourceRegisterParser;
-import org.apache.seata.mockserver.MockServer;
 import org.apache.seata.rm.DefaultResourceManager;
 import org.apache.seata.rm.RMClient;
-import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/TmClientTest.java
 
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/TmClientTest.java
index b204b0da81..1bd2325a9d 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/TmClientTest.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/mockserver/TmClientTest.java
@@ -17,21 +17,15 @@
 package org.apache.seata.core.rpc.netty.mockserver;
 
 import io.netty.channel.Channel;
-import org.apache.seata.common.ConfigurationKeys;
-import org.apache.seata.common.ConfigurationTestHelper;
 import org.apache.seata.core.model.GlobalStatus;
 import org.apache.seata.core.model.TransactionManager;
 import org.apache.seata.core.protocol.ResultCode;
 import org.apache.seata.core.rpc.netty.ChannelManagerTestHelper;
 import org.apache.seata.core.rpc.netty.TmNettyRemotingClient;
 import org.apache.seata.mockserver.MockCoordinator;
-import org.apache.seata.mockserver.MockServer;
 import org.apache.seata.tm.DefaultTransactionManager;
 import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
index 5d20e18b1f..3f52ed63c5 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Client.java
@@ -45,6 +45,8 @@ import org.apache.seata.core.protocol.ProtocolConstants;
 import org.apache.seata.core.protocol.RpcMessage;
 import org.apache.seata.core.protocol.transaction.BranchCommitRequest;
 import org.apache.seata.core.serializer.SerializerType;
+import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder;
+import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,8 +82,8 @@ public class ProtocolV1Client {
             @Override
             protected void initChannel(Channel channel) throws Exception {
                 ChannelPipeline pipeline = channel.pipeline();
-                pipeline.addLast(new ProtocolV1Encoder());
-                pipeline.addLast(new ProtocolV1Decoder(8 * 1024 * 1024));
+                pipeline.addLast(new CompatibleProtocolEncoder());
+                pipeline.addLast(new CompatibleProtocolDecoder(8 * 1024 * 
1024));
                 pipeline.addLast(new 
ClientChannelHandler(ProtocolV1Client.this));
             }
         });
@@ -124,6 +126,7 @@ public class ProtocolV1Client {
         rpcMessage.setBody(body);
         rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
 
+
         if (channel != null) {
             DefaultPromise promise = new DefaultPromise(defaultEventExecutor);
             futureMap.put(msgId, promise);
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
index 9a0d067687..20a2fdbcc5 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Server.java
@@ -35,6 +35,12 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.logging.LogLevel;
 import io.netty.handler.logging.LoggingHandler;
 import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.core.rpc.netty.CompatibleProtocolDecoder;
+import org.apache.seata.core.rpc.netty.CompatibleProtocolEncoder;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
 
 /**
  */
@@ -67,8 +73,8 @@ public class ProtocolV1Server {
                     @Override
                     protected void initChannel(Channel channel) throws 
Exception {
                         ChannelPipeline pipeline = channel.pipeline();
-                        pipeline.addLast(new ProtocolV1Decoder(8 * 1024 * 
1024));
-                        pipeline.addLast(new ProtocolV1Encoder());
+                        pipeline.addLast(new CompatibleProtocolDecoder(8 * 
1024 * 1024));
+                        pipeline.addLast(new CompatibleProtocolEncoder());
                         pipeline.addLast(new ServerChannelHandler());
                     }
                 });


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to