This is an automated email from the ASF dual-hosted git repository.

jianbin pushed a commit to branch 2.x
in repository https://gitbox.apache.org/repos/asf/incubator-seata.git


The following commit(s) were added to refs/heads/2.x by this push:
     new eb19e98a60 optimize : Eliminate RpcMessage and Encoder/Decoder 
dependencies (#6209)
eb19e98a60 is described below

commit eb19e98a60afd611394c648614a5683b948cd7ec
Author: justabug <[email protected]>
AuthorDate: Mon Jun 24 10:03:27 2024 +0800

    optimize : Eliminate RpcMessage and Encoder/Decoder dependencies (#6209)
---
 changes/en-us/2.x.md                               |   1 +
 changes/zh-cn/2.x.md                               |   2 +-
 .../rpc/netty/AbstractNettyRemotingClient.java     |   9 +-
 .../rpc/netty/AbstractNettyRemotingServer.java     |   9 +-
 .../seata/core/rpc/netty/ProtocolRpcMessage.java   |  37 ++++
 .../core/rpc/netty/v0/ProtocolRpcMessageV0.java    | 203 +++++++++++++++++++++
 .../core/rpc/netty/v1/ProtocolRpcMessageV1.java    | 195 ++++++++++++++++++++
 .../seata/core/rpc/netty/v1/ProtocolV1Decoder.java |   3 +-
 .../seata/core/rpc/netty/v1/ProtocolV1Encoder.java |   4 +-
 .../core/rpc/netty/v1/ClientChannelHandler.java    |   8 +-
 .../rpc/netty/v1/ProtocolV1SerializerTest.java     |   4 +-
 .../core/rpc/netty/v1/ServerChannelHandler.java    |  10 +-
 12 files changed, 467 insertions(+), 18 deletions(-)

diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md
index f4de08bc26..7941cbbf60 100644
--- a/changes/en-us/2.x.md
+++ b/changes/en-us/2.x.md
@@ -13,6 +13,7 @@ Add changes here for all PR submitted to the 2.x branch.
 ### optimize:
 - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] split the 
task thread pool for committing and rollbacking statuses
 - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] optimize : 
load SeataSerializer by version
+- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] Eliminate 
RpcMessage and Encoder/Decoder dependencies
 
 ### refactor:
 - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] optimize: 
send async response
diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md
index f9a0eab0c7..25ff7a0587 100644
--- a/changes/zh-cn/2.x.md
+++ b/changes/zh-cn/2.x.md
@@ -12,7 +12,7 @@
 ### optimize:
 - [[#6499](https://github.com/apache/incubator-seata/pull/6499)] 拆分 committing 
和 rollbacking 状态的任务线程池
 - [[#6208](https://github.com/apache/incubator-seata/pull/6208)] 支持多版本的Seata序列化
-
+- [[#6209](https://github.com/apache/incubator-seata/pull/6209)] 解开 RpcMessage 
和 Encoder/Decoder 的互相依赖
 
 ### refactor:
 - [[#6534](https://github.com/apache/incubator-seata/pull/6534)] 优化: 发送异步响应
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
index 2b7b35aca0..2901eb8d3f 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java
@@ -409,10 +409,13 @@ public abstract class AbstractNettyRemotingClient extends 
AbstractNettyRemoting
 
         @Override
         public void channelRead(final ChannelHandlerContext ctx, Object msg) 
throws Exception {
-            if (!(msg instanceof RpcMessage)) {
-                return;
+            RpcMessage rpcMessage = null;
+            if (msg instanceof ProtocolRpcMessage) {
+                rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
+                processMessage(ctx, rpcMessage);
+            } else {
+                LOGGER.error("rpcMessage type error");
             }
-            processMessage(ctx, (RpcMessage) msg);
         }
 
         @Override
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
index 09696286a6..72324c2689 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java
@@ -163,10 +163,13 @@ public abstract class AbstractNettyRemotingServer extends 
AbstractNettyRemoting
          */
         @Override
         public void channelRead(final ChannelHandlerContext ctx, Object msg) 
throws Exception {
-            if (!(msg instanceof RpcMessage)) {
-                return;
+            RpcMessage rpcMessage = null;
+            if (msg instanceof ProtocolRpcMessage) {
+                rpcMessage = ((ProtocolRpcMessage) msg).protocolMsg2RpcMsg();
+                processMessage(ctx, rpcMessage);
+            } else {
+                LOGGER.error("rpcMessage type error");
             }
-            processMessage(ctx, (RpcMessage) msg);
         }
 
         @Override
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java
new file mode 100644
index 0000000000..ba89c508a8
--- /dev/null
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolRpcMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty;
+
+import org.apache.seata.core.protocol.RpcMessage;
+
+/**
+ * The protocol RPC message.
+ */
+public interface ProtocolRpcMessage {
+
+    /**
+     * The protocol message to rpc message.
+     * @return
+     */
+    RpcMessage protocolMsg2RpcMsg();
+
+    /**
+     * The rpc message to protocol message.
+     * @param rpcMessage
+     */
+    void rpcMsg2ProtocolMsg(RpcMessage rpcMessage);
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java
new file mode 100644
index 0000000000..cea2d7e6f7
--- /dev/null
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v0/ProtocolRpcMessageV0.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v0;
+
+import org.apache.seata.core.compressor.CompressorType;
+import org.apache.seata.core.protocol.ProtocolConstants;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
+import org.apache.seata.core.serializer.SerializerType;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * the protocol v0 rpc message
+ **/
+public class ProtocolRpcMessageV0 implements ProtocolRpcMessage {
+
+    private static AtomicLong NEXT_ID = new AtomicLong(0);
+
+    /**
+     * Gets next message id.
+     *
+     * @return the next message id
+     */
+    public static long getNextMessageId() {
+        return NEXT_ID.incrementAndGet();
+    }
+
+    private long id;
+    private boolean isAsync;
+    private boolean isRequest;
+    private boolean isHeartbeat;
+    private Object body;
+    private byte messageType;
+    private boolean isSeataCodec;
+
+    /**
+     * Gets id.
+     *
+     * @return the id
+     */
+    public long getId() {
+        return id;
+    }
+
+    /**
+     * Sets id.
+     *
+     * @param id the id
+     */
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    /**
+     * Is async boolean.
+     *
+     * @return the boolean
+     */
+    public boolean isAsync() {
+        return isAsync;
+    }
+
+    /**
+     * Sets async.
+     *
+     * @param async the async
+     */
+    public void setAsync(boolean async) {
+        isAsync = async;
+    }
+
+    /**
+     * Is request boolean.
+     *
+     * @return the boolean
+     */
+    public boolean isRequest() {
+        return isRequest;
+    }
+
+    /**
+     * Sets request.
+     *
+     * @param request the request
+     */
+    public void setRequest(boolean request) {
+        isRequest = request;
+    }
+
+    /**
+     * Is heartbeat boolean.
+     *
+     * @return the boolean
+     */
+    public boolean isHeartbeat() {
+        return isHeartbeat;
+    }
+
+    /**
+     * Sets heartbeat.
+     *
+     * @param heartbeat the heartbeat
+     */
+    public void setHeartbeat(boolean heartbeat) {
+        isHeartbeat = heartbeat;
+    }
+
+    /**
+     * Gets body.
+     *
+     * @return the body
+     */
+    public Object getBody() {
+        return body;
+    }
+
+    /**
+     * Sets body.
+     *
+     * @param body the body
+     */
+    public void setBody(Object body) {
+        this.body = body;
+    }
+
+    public boolean isSeataCodec() {
+        return isSeataCodec;
+    }
+
+    public void setSeataCodec(boolean seataCodec) {
+        isSeataCodec = seataCodec;
+    }
+
+    public byte getMessageType() {
+        return messageType;
+    }
+
+    public void setMessageType(byte messageType) {
+        this.messageType = messageType;
+    }
+
+    @Override
+    public RpcMessage protocolMsg2RpcMsg() {
+        RpcMessage rpcMessage = new RpcMessage();
+        rpcMessage.setMessageType(this.messageType);
+        rpcMessage.setCompressor(CompressorType.NONE.getCode());
+
+        byte codecType = this.isSeataCodec ? SerializerType.SEATA.getCode() : 
SerializerType.HESSIAN.getCode();
+        rpcMessage.setCodec(codecType);
+
+        if (this.isHeartbeat) {
+            if (this.isRequest) {
+                
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST);
+            } else {
+                
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE);
+            }
+        } else {
+            if (this.isRequest) {
+                
rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY);
+            } else {
+                rpcMessage.setMessageType(ProtocolConstants.MSGTYPE_RESPONSE);
+            }
+        }
+        rpcMessage.setBody(this.body);
+        rpcMessage.setId((int) this.id);
+        return rpcMessage;
+    }
+
+    @Override
+    public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
+        this.body = rpcMessage.getBody();
+        this.id = rpcMessage.getId();
+        this.isRequest = isRequest(rpcMessage.getMessageType());
+        this.isHeartbeat = isHeartbeat(rpcMessage.getMessageType());
+        this.isSeataCodec = rpcMessage.getCodec() == 
SerializerType.SEATA.getCode();
+        this.messageType = rpcMessage.getMessageType();
+    }
+
+    private boolean isHeartbeat(byte msgType) {
+        return msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
+                || msgType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE;
+    }
+
+    private boolean isRequest(byte msgType) {
+        return msgType == ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY
+                || msgType == ProtocolConstants.MSGTYPE_RESQUEST_SYNC;
+    }
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java
 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java
new file mode 100644
index 0000000000..10668cbdc6
--- /dev/null
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolRpcMessageV1.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.core.rpc.netty.v1;
+
+import org.apache.seata.common.util.StringUtils;
+import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * protocol v1 rpc message
+ **/
+public class ProtocolRpcMessageV1 implements ProtocolRpcMessage {
+    private int id;
+    private byte messageType;
+    private byte codec;
+    private byte compressor;
+    private Map<String, String> headMap = new HashMap<>();
+    private Object body;
+
+    /**
+     * Gets id.
+     *
+     * @return the id
+     */
+    public int getId() {
+        return id;
+    }
+
+    /**
+     * Sets id.
+     *
+     * @param id the id
+     */
+    public void setId(int id) {
+        this.id = id;
+    }
+
+    /**
+     * Gets body.
+     *
+     * @return the body
+     */
+    public Object getBody() {
+        return body;
+    }
+
+    /**
+     * Sets body.
+     *
+     * @param body the body
+     */
+    public void setBody(Object body) {
+        this.body = body;
+    }
+
+    /**
+     * Gets codec.
+     *
+     * @return the codec
+     */
+    public byte getCodec() {
+        return codec;
+    }
+
+    /**
+     * Sets codec.
+     *
+     * @param codec the codec
+     * @return the codec
+     */
+    public void setCodec(byte codec) {
+        this.codec = codec;
+    }
+
+    /**
+     * Gets compressor.
+     *
+     * @return the compressor
+     */
+    public byte getCompressor() {
+        return compressor;
+    }
+
+    /**
+     * Sets compressor.
+     *
+     * @param compressor the compressor
+     * @return the compressor
+     */
+    public void setCompressor(byte compressor) {
+        this.compressor = compressor;
+    }
+
+    /**
+     * Gets head map.
+     *
+     * @return the head map
+     */
+    public Map<String, String> getHeadMap() {
+        return headMap;
+    }
+
+    /**
+     * Sets head map.
+     *
+     * @param headMap the head map
+     * @return the head map
+     */
+    public void setHeadMap(Map<String, String> headMap) {
+        this.headMap = headMap;
+    }
+
+    /**
+     * Gets head.
+     *
+     * @param headKey the head key
+     * @return the head
+     */
+    public String getHead(String headKey) {
+        return headMap.get(headKey);
+    }
+
+    /**
+     * Put head.
+     *
+     * @param headKey   the head key
+     * @param headValue the head value
+     */
+    public void putHead(String headKey, String headValue) {
+        headMap.put(headKey, headValue);
+    }
+
+    /**
+     * Gets message type.
+     *
+     * @return the message type
+     */
+    public byte getMessageType() {
+        return messageType;
+    }
+
+    /**
+     * Sets message type.
+     *
+     * @param messageType the message type
+     */
+    public void setMessageType(byte messageType) {
+        this.messageType = messageType;
+    }
+
+    @Override
+    public String toString() {
+        return StringUtils.toString(this);
+    }
+
+    @Override
+    public RpcMessage protocolMsg2RpcMsg() {
+        RpcMessage rpcMessage = new RpcMessage();
+        rpcMessage.setId(this.id);
+        rpcMessage.setMessageType(this.messageType);
+        rpcMessage.setCodec(this.codec);
+        rpcMessage.setCompressor(this.compressor);
+        rpcMessage.setHeadMap(this.headMap);
+        rpcMessage.setBody(this.body);
+        return rpcMessage;
+    }
+
+
+    @Override
+    public void rpcMsg2ProtocolMsg(RpcMessage rpcMessage) {
+        this.body = rpcMessage.getBody();
+        this.headMap = rpcMessage.getHeadMap();
+        this.id = rpcMessage.getId();
+        this.messageType = rpcMessage.getMessageType();
+        this.codec = rpcMessage.getCodec();
+        this.compressor = rpcMessage.getCompressor();
+    }
+}
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
index eb65fd20cf..26ef52ffdc 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Decoder.java
@@ -27,7 +27,6 @@ import org.apache.seata.core.compressor.CompressorFactory;
 import org.apache.seata.core.exception.DecodeException;
 import org.apache.seata.core.protocol.HeartbeatMessage;
 import org.apache.seata.core.protocol.ProtocolConstants;
-import org.apache.seata.core.protocol.RpcMessage;
 import org.apache.seata.core.serializer.Serializer;
 import org.apache.seata.core.serializer.SerializerServiceLoader;
 import org.apache.seata.core.serializer.SerializerType;
@@ -125,7 +124,7 @@ public class ProtocolV1Decoder extends 
LengthFieldBasedFrameDecoder {
         byte compressorType = frame.readByte();
         int requestId = frame.readInt();
 
-        RpcMessage rpcMessage = new RpcMessage();
+        ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
         rpcMessage.setCodec(codecType);
         rpcMessage.setId(requestId);
         rpcMessage.setCompressor(compressorType);
diff --git 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
index 7c32e98f2d..575992fd87 100644
--- 
a/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
+++ 
b/core/src/main/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1Encoder.java
@@ -66,8 +66,10 @@ public class ProtocolV1Encoder extends MessageToByteEncoder {
         try {
 
             if (msg instanceof RpcMessage) {
-                RpcMessage rpcMessage = (RpcMessage) msg;
+                RpcMessage rpcMsg = (RpcMessage) msg;
 
+                ProtocolRpcMessageV1 rpcMessage = new ProtocolRpcMessageV1();
+                rpcMessage.rpcMsg2ProtocolMsg(rpcMsg);
                 int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
                 int headLength = ProtocolConstants.V1_HEAD_LENGTH;
 
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
index 98b9bd7964..e35c124e30 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ClientChannelHandler.java
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.util.concurrent.DefaultPromise;
 import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +50,8 @@ public class ClientChannelHandler extends 
ChannelInboundHandlerAdapter {
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-        if (msg instanceof RpcMessage) {
-            RpcMessage rpcMessage = (RpcMessage) msg;
+        if (msg instanceof ProtocolRpcMessage) {
+            RpcMessage rpcMessage = ((ProtocolRpcMessage) 
msg).protocolMsg2RpcMsg();
             int msgId = rpcMessage.getId();
             DefaultPromise future = (DefaultPromise) 
client.futureMap.remove(msgId);
             if (future != null) {
@@ -58,7 +59,10 @@ public class ClientChannelHandler extends 
ChannelInboundHandlerAdapter {
             } else {
                 LOGGER.warn("miss msg id:{}", msgId);
             }
+        }else {
+            LOGGER.warn("msg is not ProtocolRpcMessage");
         }
+
     }
 
     @Override
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
index 31025304a1..5ee0df7dee 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ProtocolV1SerializerTest.java
@@ -27,8 +27,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.core.model.BranchType;
-import org.apache.seata.core.protocol.RpcMessage;
 import org.apache.seata.core.protocol.transaction.BranchCommitRequest;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -80,7 +80,7 @@ public class ProtocolV1SerializerTest {
                     while (tag.getAndIncrement() < runTimes) {
                         try {
                             Future future = client.sendRpc(head, body);
-                            RpcMessage resp = (RpcMessage) future.get(10, 
TimeUnit.SECONDS);
+                            ProtocolRpcMessage resp = (ProtocolRpcMessage) 
future.get(10, TimeUnit.SECONDS);
                             if (resp != null) {
                                 success.incrementAndGet();
                             }
diff --git 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
index 58745a41ad..8b468d0e8f 100644
--- 
a/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
+++ 
b/test/src/test/java/org/apache/seata/core/rpc/netty/v1/ServerChannelHandler.java
@@ -22,6 +22,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import org.apache.seata.core.protocol.ProtocolConstants;
 import org.apache.seata.core.protocol.RpcMessage;
+import org.apache.seata.core.rpc.netty.ProtocolRpcMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,11 +40,12 @@ public class ServerChannelHandler extends 
ChannelInboundHandlerAdapter {
     public void channelRead(ChannelHandlerContext ctx, Object msg) {
         Channel channel = ctx.channel();
 
-        if (msg instanceof RpcMessage) {
-            ((RpcMessage) 
msg).setMessageType(ProtocolConstants.MSGTYPE_RESPONSE);
+        if (msg instanceof ProtocolRpcMessage) {
+            RpcMessage rpcMessage = ((ProtocolRpcMessage) 
msg).protocolMsg2RpcMsg();
+            channel.writeAndFlush(rpcMessage);
+        } else {
+            LOGGER.error("rpcMessage type error");
         }
-
-        channel.writeAndFlush(msg);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to